| package fscache // import "github.com/docker/docker/builder/fscache" |
| |
| import ( |
| "archive/tar" |
| "context" |
| "crypto/sha256" |
| "encoding/json" |
| "hash" |
| "os" |
| "path/filepath" |
| "sort" |
| "sync" |
| "time" |
| |
| "github.com/boltdb/bolt" |
| "github.com/docker/docker/builder" |
| "github.com/docker/docker/builder/remotecontext" |
| "github.com/docker/docker/pkg/archive" |
| "github.com/docker/docker/pkg/directory" |
| "github.com/docker/docker/pkg/stringid" |
| "github.com/docker/docker/pkg/tarsum" |
| "github.com/moby/buildkit/session/filesync" |
| "github.com/pkg/errors" |
| "github.com/sirupsen/logrus" |
| "github.com/tonistiigi/fsutil" |
| "golang.org/x/sync/singleflight" |
| ) |
| |
| const dbFile = "fscache.db" |
| const cacheKey = "cache" |
| const metaKey = "meta" |
| |
| // Backend is a backing implementation for FSCache |
| type Backend interface { |
| Get(id string) (string, error) |
| Remove(id string) error |
| } |
| |
| // FSCache allows syncing remote resources to cached snapshots |
| type FSCache struct { |
| opt Opt |
| transports map[string]Transport |
| mu sync.Mutex |
| g singleflight.Group |
| store *fsCacheStore |
| } |
| |
| // Opt defines options for initializing FSCache |
| type Opt struct { |
| Backend Backend |
| Root string // for storing local metadata |
| GCPolicy GCPolicy |
| } |
| |
| // GCPolicy defines policy for garbage collection |
| type GCPolicy struct { |
| MaxSize uint64 |
| MaxKeepDuration time.Duration |
| } |
| |
| // NewFSCache returns new FSCache object |
| func NewFSCache(opt Opt) (*FSCache, error) { |
| store, err := newFSCacheStore(opt) |
| if err != nil { |
| return nil, err |
| } |
| return &FSCache{ |
| store: store, |
| opt: opt, |
| transports: make(map[string]Transport), |
| }, nil |
| } |
| |
| // Transport defines a method for syncing remote data to FSCache |
| type Transport interface { |
| Copy(ctx context.Context, id RemoteIdentifier, dest string, cs filesync.CacheUpdater) error |
| } |
| |
| // RemoteIdentifier identifies a transfer request |
| type RemoteIdentifier interface { |
| Key() string |
| SharedKey() string |
| Transport() string |
| } |
| |
| // RegisterTransport registers a new transport method |
| func (fsc *FSCache) RegisterTransport(id string, transport Transport) error { |
| fsc.mu.Lock() |
| defer fsc.mu.Unlock() |
| if _, ok := fsc.transports[id]; ok { |
| return errors.Errorf("transport %v already exists", id) |
| } |
| fsc.transports[id] = transport |
| return nil |
| } |
| |
| // SyncFrom returns a source based on a remote identifier |
| func (fsc *FSCache) SyncFrom(ctx context.Context, id RemoteIdentifier) (builder.Source, error) { // cacheOpt |
| trasportID := id.Transport() |
| fsc.mu.Lock() |
| transport, ok := fsc.transports[id.Transport()] |
| if !ok { |
| fsc.mu.Unlock() |
| return nil, errors.Errorf("invalid transport %s", trasportID) |
| } |
| |
| logrus.Debugf("SyncFrom %s %s", id.Key(), id.SharedKey()) |
| fsc.mu.Unlock() |
| sourceRef, err, _ := fsc.g.Do(id.Key(), func() (interface{}, error) { |
| var sourceRef *cachedSourceRef |
| sourceRef, err := fsc.store.Get(id.Key()) |
| if err == nil { |
| return sourceRef, nil |
| } |
| |
| // check for unused shared cache |
| sharedKey := id.SharedKey() |
| if sharedKey != "" { |
| r, err := fsc.store.Rebase(sharedKey, id.Key()) |
| if err == nil { |
| sourceRef = r |
| } |
| } |
| |
| if sourceRef == nil { |
| var err error |
| sourceRef, err = fsc.store.New(id.Key(), sharedKey) |
| if err != nil { |
| return nil, errors.Wrap(err, "failed to create remote context") |
| } |
| } |
| |
| if err := syncFrom(ctx, sourceRef, transport, id); err != nil { |
| sourceRef.Release() |
| return nil, err |
| } |
| if err := sourceRef.resetSize(-1); err != nil { |
| return nil, err |
| } |
| return sourceRef, nil |
| }) |
| if err != nil { |
| return nil, err |
| } |
| ref := sourceRef.(*cachedSourceRef) |
| if ref.src == nil { // failsafe |
| return nil, errors.Errorf("invalid empty pull") |
| } |
| wc := &wrappedContext{Source: ref.src, closer: func() error { |
| ref.Release() |
| return nil |
| }} |
| return wc, nil |
| } |
| |
| // DiskUsage reports how much data is allocated by the cache |
| func (fsc *FSCache) DiskUsage(ctx context.Context) (int64, error) { |
| return fsc.store.DiskUsage(ctx) |
| } |
| |
| // Prune allows manually cleaning up the cache |
| func (fsc *FSCache) Prune(ctx context.Context) (uint64, error) { |
| return fsc.store.Prune(ctx) |
| } |
| |
| // Close stops the gc and closes the persistent db |
| func (fsc *FSCache) Close() error { |
| return fsc.store.Close() |
| } |
| |
| func syncFrom(ctx context.Context, cs *cachedSourceRef, transport Transport, id RemoteIdentifier) (retErr error) { |
| src := cs.src |
| if src == nil { |
| src = remotecontext.NewCachableSource(cs.Dir()) |
| } |
| |
| if !cs.cached { |
| if err := cs.storage.db.View(func(tx *bolt.Tx) error { |
| b := tx.Bucket([]byte(id.Key())) |
| dt := b.Get([]byte(cacheKey)) |
| if dt != nil { |
| if err := src.UnmarshalBinary(dt); err != nil { |
| return err |
| } |
| } else { |
| return errors.Wrap(src.Scan(), "failed to scan cache records") |
| } |
| return nil |
| }); err != nil { |
| return err |
| } |
| } |
| |
| dc := &detectChanges{f: src.HandleChange} |
| |
| // todo: probably send a bucket to `Copy` and let it return source |
| // but need to make sure that tx is safe |
| if err := transport.Copy(ctx, id, cs.Dir(), dc); err != nil { |
| return errors.Wrapf(err, "failed to copy to %s", cs.Dir()) |
| } |
| |
| if !dc.supported { |
| if err := src.Scan(); err != nil { |
| return errors.Wrap(err, "failed to scan cache records after transfer") |
| } |
| } |
| cs.cached = true |
| cs.src = src |
| return cs.storage.db.Update(func(tx *bolt.Tx) error { |
| dt, err := src.MarshalBinary() |
| if err != nil { |
| return err |
| } |
| b := tx.Bucket([]byte(id.Key())) |
| return b.Put([]byte(cacheKey), dt) |
| }) |
| } |
| |
| type fsCacheStore struct { |
| mu sync.Mutex |
| sources map[string]*cachedSource |
| db *bolt.DB |
| fs Backend |
| gcTimer *time.Timer |
| gcPolicy GCPolicy |
| } |
| |
| // CachePolicy defines policy for keeping a resource in cache |
| type CachePolicy struct { |
| Priority int |
| LastUsed time.Time |
| } |
| |
| func defaultCachePolicy() CachePolicy { |
| return CachePolicy{Priority: 10, LastUsed: time.Now()} |
| } |
| |
| func newFSCacheStore(opt Opt) (*fsCacheStore, error) { |
| if err := os.MkdirAll(opt.Root, 0700); err != nil { |
| return nil, err |
| } |
| p := filepath.Join(opt.Root, dbFile) |
| db, err := bolt.Open(p, 0600, nil) |
| if err != nil { |
| return nil, errors.Wrap(err, "failed to open database file %s") |
| } |
| s := &fsCacheStore{db: db, sources: make(map[string]*cachedSource), fs: opt.Backend, gcPolicy: opt.GCPolicy} |
| db.View(func(tx *bolt.Tx) error { |
| return tx.ForEach(func(name []byte, b *bolt.Bucket) error { |
| dt := b.Get([]byte(metaKey)) |
| if dt == nil { |
| return nil |
| } |
| var sm sourceMeta |
| if err := json.Unmarshal(dt, &sm); err != nil { |
| return err |
| } |
| dir, err := s.fs.Get(sm.BackendID) |
| if err != nil { |
| return err // TODO: handle gracefully |
| } |
| source := &cachedSource{ |
| refs: make(map[*cachedSourceRef]struct{}), |
| id: string(name), |
| dir: dir, |
| sourceMeta: sm, |
| storage: s, |
| } |
| s.sources[string(name)] = source |
| return nil |
| }) |
| }) |
| |
| s.gcTimer = s.startPeriodicGC(5 * time.Minute) |
| return s, nil |
| } |
| |
| func (s *fsCacheStore) startPeriodicGC(interval time.Duration) *time.Timer { |
| var t *time.Timer |
| t = time.AfterFunc(interval, func() { |
| if err := s.GC(); err != nil { |
| logrus.Errorf("build gc error: %v", err) |
| } |
| t.Reset(interval) |
| }) |
| return t |
| } |
| |
| func (s *fsCacheStore) Close() error { |
| s.gcTimer.Stop() |
| return s.db.Close() |
| } |
| |
| func (s *fsCacheStore) New(id, sharedKey string) (*cachedSourceRef, error) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| var ret *cachedSource |
| if err := s.db.Update(func(tx *bolt.Tx) error { |
| b, err := tx.CreateBucket([]byte(id)) |
| if err != nil { |
| return err |
| } |
| backendID := stringid.GenerateRandomID() |
| dir, err := s.fs.Get(backendID) |
| if err != nil { |
| return err |
| } |
| source := &cachedSource{ |
| refs: make(map[*cachedSourceRef]struct{}), |
| id: id, |
| dir: dir, |
| sourceMeta: sourceMeta{ |
| BackendID: backendID, |
| SharedKey: sharedKey, |
| CachePolicy: defaultCachePolicy(), |
| }, |
| storage: s, |
| } |
| dt, err := json.Marshal(source.sourceMeta) |
| if err != nil { |
| return err |
| } |
| if err := b.Put([]byte(metaKey), dt); err != nil { |
| return err |
| } |
| s.sources[id] = source |
| ret = source |
| return nil |
| }); err != nil { |
| return nil, err |
| } |
| return ret.getRef(), nil |
| } |
| |
| func (s *fsCacheStore) Rebase(sharedKey, newid string) (*cachedSourceRef, error) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| var ret *cachedSource |
| for id, snap := range s.sources { |
| if snap.SharedKey == sharedKey && len(snap.refs) == 0 { |
| if err := s.db.Update(func(tx *bolt.Tx) error { |
| if err := tx.DeleteBucket([]byte(id)); err != nil { |
| return err |
| } |
| b, err := tx.CreateBucket([]byte(newid)) |
| if err != nil { |
| return err |
| } |
| snap.id = newid |
| snap.CachePolicy = defaultCachePolicy() |
| dt, err := json.Marshal(snap.sourceMeta) |
| if err != nil { |
| return err |
| } |
| if err := b.Put([]byte(metaKey), dt); err != nil { |
| return err |
| } |
| delete(s.sources, id) |
| s.sources[newid] = snap |
| return nil |
| }); err != nil { |
| return nil, err |
| } |
| ret = snap |
| break |
| } |
| } |
| if ret == nil { |
| return nil, errors.Errorf("no candidate for rebase") |
| } |
| return ret.getRef(), nil |
| } |
| |
| func (s *fsCacheStore) Get(id string) (*cachedSourceRef, error) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| src, ok := s.sources[id] |
| if !ok { |
| return nil, errors.Errorf("not found") |
| } |
| return src.getRef(), nil |
| } |
| |
| // DiskUsage reports how much data is allocated by the cache |
| func (s *fsCacheStore) DiskUsage(ctx context.Context) (int64, error) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| var size int64 |
| |
| for _, snap := range s.sources { |
| if len(snap.refs) == 0 { |
| ss, err := snap.getSize(ctx) |
| if err != nil { |
| return 0, err |
| } |
| size += ss |
| } |
| } |
| return size, nil |
| } |
| |
| // Prune allows manually cleaning up the cache |
| func (s *fsCacheStore) Prune(ctx context.Context) (uint64, error) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| var size uint64 |
| |
| for id, snap := range s.sources { |
| select { |
| case <-ctx.Done(): |
| logrus.Debugf("Cache prune operation cancelled, pruned size: %d", size) |
| // when the context is cancelled, only return current size and nil |
| return size, nil |
| default: |
| } |
| if len(snap.refs) == 0 { |
| ss, err := snap.getSize(ctx) |
| if err != nil { |
| return size, err |
| } |
| if err := s.delete(id); err != nil { |
| return size, errors.Wrapf(err, "failed to delete %s", id) |
| } |
| size += uint64(ss) |
| } |
| } |
| return size, nil |
| } |
| |
| // GC runs a garbage collector on FSCache |
| func (s *fsCacheStore) GC() error { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| var size uint64 |
| |
| ctx := context.Background() |
| cutoff := time.Now().Add(-s.gcPolicy.MaxKeepDuration) |
| var blacklist []*cachedSource |
| |
| for id, snap := range s.sources { |
| if len(snap.refs) == 0 { |
| if cutoff.After(snap.CachePolicy.LastUsed) { |
| if err := s.delete(id); err != nil { |
| return errors.Wrapf(err, "failed to delete %s", id) |
| } |
| } else { |
| ss, err := snap.getSize(ctx) |
| if err != nil { |
| return err |
| } |
| size += uint64(ss) |
| blacklist = append(blacklist, snap) |
| } |
| } |
| } |
| |
| sort.Sort(sortableCacheSources(blacklist)) |
| for _, snap := range blacklist { |
| if size <= s.gcPolicy.MaxSize { |
| break |
| } |
| ss, err := snap.getSize(ctx) |
| if err != nil { |
| return err |
| } |
| if err := s.delete(snap.id); err != nil { |
| return errors.Wrapf(err, "failed to delete %s", snap.id) |
| } |
| size -= uint64(ss) |
| } |
| return nil |
| } |
| |
| // keep mu while calling this |
| func (s *fsCacheStore) delete(id string) error { |
| src, ok := s.sources[id] |
| if !ok { |
| return nil |
| } |
| if len(src.refs) > 0 { |
| return errors.Errorf("can't delete %s because it has active references", id) |
| } |
| delete(s.sources, id) |
| if err := s.db.Update(func(tx *bolt.Tx) error { |
| return tx.DeleteBucket([]byte(id)) |
| }); err != nil { |
| return err |
| } |
| return s.fs.Remove(src.BackendID) |
| } |
| |
| type sourceMeta struct { |
| SharedKey string |
| BackendID string |
| CachePolicy CachePolicy |
| Size int64 |
| } |
| |
| type cachedSource struct { |
| sourceMeta |
| refs map[*cachedSourceRef]struct{} |
| id string |
| dir string |
| src *remotecontext.CachableSource |
| storage *fsCacheStore |
| cached bool // keep track if cache is up to date |
| } |
| |
| type cachedSourceRef struct { |
| *cachedSource |
| } |
| |
| func (cs *cachedSource) Dir() string { |
| return cs.dir |
| } |
| |
| // hold storage lock before calling |
| func (cs *cachedSource) getRef() *cachedSourceRef { |
| ref := &cachedSourceRef{cachedSource: cs} |
| cs.refs[ref] = struct{}{} |
| return ref |
| } |
| |
| // hold storage lock before calling |
| func (cs *cachedSource) getSize(ctx context.Context) (int64, error) { |
| if cs.sourceMeta.Size < 0 { |
| ss, err := directory.Size(ctx, cs.dir) |
| if err != nil { |
| return 0, err |
| } |
| if err := cs.resetSize(ss); err != nil { |
| return 0, err |
| } |
| return ss, nil |
| } |
| return cs.sourceMeta.Size, nil |
| } |
| |
| func (cs *cachedSource) resetSize(val int64) error { |
| cs.sourceMeta.Size = val |
| return cs.saveMeta() |
| } |
| func (cs *cachedSource) saveMeta() error { |
| return cs.storage.db.Update(func(tx *bolt.Tx) error { |
| b := tx.Bucket([]byte(cs.id)) |
| dt, err := json.Marshal(cs.sourceMeta) |
| if err != nil { |
| return err |
| } |
| return b.Put([]byte(metaKey), dt) |
| }) |
| } |
| |
| func (csr *cachedSourceRef) Release() error { |
| csr.cachedSource.storage.mu.Lock() |
| defer csr.cachedSource.storage.mu.Unlock() |
| delete(csr.cachedSource.refs, csr) |
| if len(csr.cachedSource.refs) == 0 { |
| go csr.cachedSource.storage.GC() |
| } |
| return nil |
| } |
| |
| type detectChanges struct { |
| f fsutil.ChangeFunc |
| supported bool |
| } |
| |
| func (dc *detectChanges) HandleChange(kind fsutil.ChangeKind, path string, fi os.FileInfo, err error) error { |
| if dc == nil { |
| return nil |
| } |
| return dc.f(kind, path, fi, err) |
| } |
| |
| func (dc *detectChanges) MarkSupported(v bool) { |
| if dc == nil { |
| return |
| } |
| dc.supported = v |
| } |
| |
| func (dc *detectChanges) ContentHasher() fsutil.ContentHasher { |
| return newTarsumHash |
| } |
| |
| type wrappedContext struct { |
| builder.Source |
| closer func() error |
| } |
| |
| func (wc *wrappedContext) Close() error { |
| if err := wc.Source.Close(); err != nil { |
| return err |
| } |
| return wc.closer() |
| } |
| |
| type sortableCacheSources []*cachedSource |
| |
| // Len is the number of elements in the collection. |
| func (s sortableCacheSources) Len() int { |
| return len(s) |
| } |
| |
| // Less reports whether the element with |
| // index i should sort before the element with index j. |
| func (s sortableCacheSources) Less(i, j int) bool { |
| return s[i].CachePolicy.LastUsed.Before(s[j].CachePolicy.LastUsed) |
| } |
| |
| // Swap swaps the elements with indexes i and j. |
| func (s sortableCacheSources) Swap(i, j int) { |
| s[i], s[j] = s[j], s[i] |
| } |
| |
| func newTarsumHash(stat *fsutil.Stat) (hash.Hash, error) { |
| fi := &fsutil.StatInfo{stat} |
| p := stat.Path |
| if fi.IsDir() { |
| p += string(os.PathSeparator) |
| } |
| h, err := archive.FileInfoHeader(p, fi, stat.Linkname) |
| if err != nil { |
| return nil, err |
| } |
| h.Name = p |
| h.Uid = int(stat.Uid) |
| h.Gid = int(stat.Gid) |
| h.Linkname = stat.Linkname |
| if stat.Xattrs != nil { |
| h.Xattrs = make(map[string]string) |
| for k, v := range stat.Xattrs { |
| h.Xattrs[k] = string(v) |
| } |
| } |
| |
| tsh := &tarsumHash{h: h, Hash: sha256.New()} |
| tsh.Reset() |
| return tsh, nil |
| } |
| |
| // Reset resets the Hash to its initial state. |
| func (tsh *tarsumHash) Reset() { |
| tsh.Hash.Reset() |
| tarsum.WriteV1Header(tsh.h, tsh.Hash) |
| } |
| |
| type tarsumHash struct { |
| hash.Hash |
| h *tar.Header |
| } |