blob: 0fb0103e5088e769b4d64555e0a5f4902250ccb2 [file] [log] [blame]
package cache
import (
"context"
"sort"
"sync"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/gc"
"github.com/containerd/containerd/leases"
"github.com/docker/docker/pkg/idtools"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/util/flightcontrol"
digest "github.com/opencontainers/go-digest"
imagespecidentity "github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
var (
ErrLocked = errors.New("locked")
errNotFound = errors.New("not found")
errInvalid = errors.New("invalid")
)
type ManagerOpt struct {
Snapshotter snapshot.Snapshotter
MetadataStore *metadata.Store
ContentStore content.Store
LeaseManager leases.Manager
PruneRefChecker ExternalRefCheckerFunc
GarbageCollect func(ctx context.Context) (gc.Stats, error)
Applier diff.Applier
Differ diff.Comparer
}
type Accessor interface {
GetByBlob(ctx context.Context, desc ocispec.Descriptor, parent ImmutableRef, opts ...RefOption) (ImmutableRef, error)
Get(ctx context.Context, id string, opts ...RefOption) (ImmutableRef, error)
New(ctx context.Context, parent ImmutableRef, s session.Group, opts ...RefOption) (MutableRef, error)
GetMutable(ctx context.Context, id string, opts ...RefOption) (MutableRef, error) // Rebase?
IdentityMapping() *idtools.IdentityMapping
Metadata(string) *metadata.StorageItem
}
type Controller interface {
DiskUsage(ctx context.Context, info client.DiskUsageInfo) ([]*client.UsageInfo, error)
Prune(ctx context.Context, ch chan client.UsageInfo, info ...client.PruneInfo) error
}
type Manager interface {
Accessor
Controller
Close() error
}
type ExternalRefCheckerFunc func() (ExternalRefChecker, error)
type ExternalRefChecker interface {
Exists(string, []digest.Digest) bool
}
type cacheManager struct {
records map[string]*cacheRecord
mu sync.Mutex
ManagerOpt
md *metadata.Store
muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results
unlazyG flightcontrol.Group
}
func NewManager(opt ManagerOpt) (Manager, error) {
cm := &cacheManager{
ManagerOpt: opt,
md: opt.MetadataStore,
records: make(map[string]*cacheRecord),
}
if err := cm.init(context.TODO()); err != nil {
return nil, err
}
// cm.scheduleGC(5 * time.Minute)
return cm, nil
}
func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispec.Descriptor, parent ImmutableRef, opts ...RefOption) (ir ImmutableRef, rerr error) {
diffID, err := diffIDFromDescriptor(desc)
if err != nil {
return nil, err
}
chainID := diffID
blobChainID := imagespecidentity.ChainID([]digest.Digest{desc.Digest, diffID})
descHandlers := descHandlersOf(opts...)
if desc.Digest != "" && (descHandlers == nil || descHandlers[desc.Digest] == nil) {
if _, err := cm.ContentStore.Info(ctx, desc.Digest); errors.Is(err, errdefs.ErrNotFound) {
return nil, NeedsRemoteProvidersError([]digest.Digest{desc.Digest})
} else if err != nil {
return nil, err
}
}
var p *immutableRef
var parentID string
if parent != nil {
pInfo := parent.Info()
if pInfo.ChainID == "" || pInfo.BlobChainID == "" {
return nil, errors.Errorf("failed to get ref by blob on non-addressable parent")
}
chainID = imagespecidentity.ChainID([]digest.Digest{pInfo.ChainID, chainID})
blobChainID = imagespecidentity.ChainID([]digest.Digest{pInfo.BlobChainID, blobChainID})
p2, err := cm.Get(ctx, parent.ID(), NoUpdateLastUsed, descHandlers)
if err != nil {
return nil, err
}
if err := p2.Finalize(ctx, true); err != nil {
return nil, err
}
parentID = p2.ID()
p = p2.(*immutableRef)
}
releaseParent := false
defer func() {
if releaseParent || rerr != nil && p != nil {
p.Release(context.TODO())
}
}()
cm.mu.Lock()
defer cm.mu.Unlock()
sis, err := cm.MetadataStore.Search("blobchainid:" + blobChainID.String())
if err != nil {
return nil, err
}
if len(sis) > 0 {
ref, err := cm.get(ctx, sis[0].ID(), opts...)
if err != nil && !IsNotFound(err) {
return nil, errors.Wrapf(err, "failed to get record %s by blobchainid", sis[0].ID())
}
if p != nil {
releaseParent = true
}
if err := setImageRefMetadata(ref, opts...); err != nil {
return nil, errors.Wrapf(err, "failed to append image ref metadata to ref %s", ref.ID())
}
return ref, nil
}
sis, err = cm.MetadataStore.Search("chainid:" + chainID.String())
if err != nil {
return nil, err
}
var link ImmutableRef
if len(sis) > 0 {
ref, err := cm.get(ctx, sis[0].ID(), opts...)
if err != nil && !IsNotFound(err) {
return nil, errors.Wrapf(err, "failed to get record %s by chainid", sis[0].ID())
}
link = ref
}
id := identity.NewID()
snapshotID := chainID.String()
blobOnly := true
if link != nil {
snapshotID = getSnapshotID(link.Metadata())
blobOnly = getBlobOnly(link.Metadata())
go link.Release(context.TODO())
}
l, err := cm.ManagerOpt.LeaseManager.Create(ctx, func(l *leases.Lease) error {
l.ID = id
l.Labels = map[string]string{
"containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano),
}
return nil
})
if err != nil {
return nil, errors.Wrap(err, "failed to create lease")
}
defer func() {
if rerr != nil {
if err := cm.ManagerOpt.LeaseManager.Delete(context.TODO(), leases.Lease{
ID: l.ID,
}); err != nil {
logrus.Errorf("failed to remove lease: %+v", err)
}
}
}()
if err := cm.ManagerOpt.LeaseManager.AddResource(ctx, l, leases.Resource{
ID: snapshotID,
Type: "snapshots/" + cm.ManagerOpt.Snapshotter.Name(),
}); err != nil {
return nil, errors.Wrapf(err, "failed to add snapshot %s to lease", id)
}
if desc.Digest != "" {
if err := cm.ManagerOpt.LeaseManager.AddResource(ctx, leases.Lease{ID: id}, leases.Resource{
ID: desc.Digest.String(),
Type: "content",
}); err != nil {
return nil, errors.Wrapf(err, "failed to add blob %s to lease", id)
}
}
md, _ := cm.md.Get(id)
rec := &cacheRecord{
mu: &sync.Mutex{},
cm: cm,
refs: make(map[ref]struct{}),
parent: p,
md: md,
}
if err := initializeMetadata(rec, parentID, opts...); err != nil {
return nil, err
}
if err := setImageRefMetadata(rec, opts...); err != nil {
return nil, errors.Wrapf(err, "failed to append image ref metadata to ref %s", rec.ID())
}
queueDiffID(rec.md, diffID.String())
queueBlob(rec.md, desc.Digest.String())
queueChainID(rec.md, chainID.String())
queueBlobChainID(rec.md, blobChainID.String())
queueSnapshotID(rec.md, snapshotID)
queueBlobOnly(rec.md, blobOnly)
queueMediaType(rec.md, desc.MediaType)
queueBlobSize(rec.md, desc.Size)
queueCommitted(rec.md)
if err := rec.md.Commit(); err != nil {
return nil, err
}
cm.records[id] = rec
return rec.ref(true, descHandlers), nil
}
// init loads all snapshots from metadata state and tries to load the records
// from the snapshotter. If snaphot can't be found, metadata is deleted as well.
func (cm *cacheManager) init(ctx context.Context) error {
items, err := cm.md.All()
if err != nil {
return err
}
for _, si := range items {
if _, err := cm.getRecord(ctx, si.ID()); err != nil {
logrus.Debugf("could not load snapshot %s: %+v", si.ID(), err)
cm.md.Clear(si.ID())
cm.LeaseManager.Delete(ctx, leases.Lease{ID: si.ID()})
}
}
return nil
}
// IdentityMapping returns the userns remapping used for refs
func (cm *cacheManager) IdentityMapping() *idtools.IdentityMapping {
return cm.Snapshotter.IdentityMapping()
}
// Close closes the manager and releases the metadata database lock. No other
// method should be called after Close.
func (cm *cacheManager) Close() error {
// TODO: allocate internal context and cancel it here
return cm.md.Close()
}
// Get returns an immutable snapshot reference for ID
func (cm *cacheManager) Get(ctx context.Context, id string, opts ...RefOption) (ImmutableRef, error) {
cm.mu.Lock()
defer cm.mu.Unlock()
return cm.get(ctx, id, opts...)
}
func (cm *cacheManager) Metadata(id string) *metadata.StorageItem {
cm.mu.Lock()
defer cm.mu.Unlock()
r, ok := cm.records[id]
if !ok {
return nil
}
return r.Metadata()
}
// get requires manager lock to be taken
func (cm *cacheManager) get(ctx context.Context, id string, opts ...RefOption) (*immutableRef, error) {
rec, err := cm.getRecord(ctx, id, opts...)
if err != nil {
return nil, err
}
rec.mu.Lock()
defer rec.mu.Unlock()
triggerUpdate := true
for _, o := range opts {
if o == NoUpdateLastUsed {
triggerUpdate = false
}
}
descHandlers := descHandlersOf(opts...)
if rec.mutable {
if len(rec.refs) != 0 {
return nil, errors.Wrapf(ErrLocked, "%s is locked", id)
}
if rec.equalImmutable != nil {
return rec.equalImmutable.ref(triggerUpdate, descHandlers), nil
}
return rec.mref(triggerUpdate, descHandlers).commit(ctx)
}
return rec.ref(triggerUpdate, descHandlers), nil
}
// getRecord returns record for id. Requires manager lock.
func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOption) (cr *cacheRecord, retErr error) {
checkLazyProviders := func(rec *cacheRecord) error {
missing := NeedsRemoteProvidersError(nil)
dhs := descHandlersOf(opts...)
for {
blob := digest.Digest(getBlob(rec.md))
if isLazy, err := rec.isLazy(ctx); err != nil {
return err
} else if isLazy && dhs[blob] == nil {
missing = append(missing, blob)
}
if rec.parent == nil {
break
}
rec = rec.parent.cacheRecord
}
if len(missing) > 0 {
return missing
}
return nil
}
if rec, ok := cm.records[id]; ok {
if rec.isDead() {
return nil, errors.Wrapf(errNotFound, "failed to get dead record %s", id)
}
if err := checkLazyProviders(rec); err != nil {
return nil, err
}
return rec, nil
}
md, ok := cm.md.Get(id)
if !ok {
return nil, errors.Wrapf(errNotFound, "%s not found", id)
}
if mutableID := getEqualMutable(md); mutableID != "" {
mutable, err := cm.getRecord(ctx, mutableID)
if err != nil {
// check loading mutable deleted record from disk
if IsNotFound(err) {
cm.md.Clear(id)
}
return nil, err
}
// parent refs are possibly lazy so keep it hold the description handlers.
var dhs DescHandlers
if mutable.parent != nil {
dhs = mutable.parent.descHandlers
}
rec := &cacheRecord{
mu: &sync.Mutex{},
cm: cm,
refs: make(map[ref]struct{}),
parent: mutable.parentRef(false, dhs),
md: md,
equalMutable: &mutableRef{cacheRecord: mutable},
}
mutable.equalImmutable = &immutableRef{cacheRecord: rec}
cm.records[id] = rec
return rec, nil
}
var parent *immutableRef
if parentID := getParent(md); parentID != "" {
var err error
parent, err = cm.get(ctx, parentID, append(opts, NoUpdateLastUsed)...)
if err != nil {
return nil, err
}
defer func() {
if retErr != nil {
parent.mu.Lock()
parent.release(context.TODO())
parent.mu.Unlock()
}
}()
}
rec := &cacheRecord{
mu: &sync.Mutex{},
mutable: !getCommitted(md),
cm: cm,
refs: make(map[ref]struct{}),
parent: parent,
md: md,
}
// the record was deleted but we crashed before data on disk was removed
if getDeleted(md) {
if err := rec.remove(ctx, true); err != nil {
return nil, err
}
return nil, errors.Wrapf(errNotFound, "failed to get deleted record %s", id)
}
if err := initializeMetadata(rec, getParent(md), opts...); err != nil {
return nil, err
}
if err := setImageRefMetadata(rec, opts...); err != nil {
return nil, errors.Wrapf(err, "failed to append image ref metadata to ref %s", rec.ID())
}
cm.records[id] = rec
if err := checkLazyProviders(rec); err != nil {
return nil, err
}
return rec, nil
}
func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, sess session.Group, opts ...RefOption) (mr MutableRef, err error) {
id := identity.NewID()
var parent *immutableRef
var parentID string
var parentSnapshotID string
if s != nil {
if _, ok := s.(*immutableRef); ok {
parent = s.Clone().(*immutableRef)
} else {
p, err := cm.Get(ctx, s.ID(), append(opts, NoUpdateLastUsed)...)
if err != nil {
return nil, err
}
parent = p.(*immutableRef)
}
if err := parent.Finalize(ctx, true); err != nil {
return nil, err
}
if err := parent.Extract(ctx, sess); err != nil {
return nil, err
}
parentSnapshotID = getSnapshotID(parent.md)
parentID = parent.ID()
}
defer func() {
if err != nil && parent != nil {
parent.Release(context.TODO())
}
}()
l, err := cm.ManagerOpt.LeaseManager.Create(ctx, func(l *leases.Lease) error {
l.ID = id
l.Labels = map[string]string{
"containerd.io/gc.flat": time.Now().UTC().Format(time.RFC3339Nano),
}
return nil
})
if err != nil {
return nil, errors.Wrap(err, "failed to create lease")
}
defer func() {
if err != nil {
if err := cm.ManagerOpt.LeaseManager.Delete(context.TODO(), leases.Lease{
ID: l.ID,
}); err != nil {
logrus.Errorf("failed to remove lease: %+v", err)
}
}
}()
if err := cm.ManagerOpt.LeaseManager.AddResource(ctx, l, leases.Resource{
ID: id,
Type: "snapshots/" + cm.ManagerOpt.Snapshotter.Name(),
}); err != nil {
return nil, errors.Wrapf(err, "failed to add snapshot %s to lease", id)
}
if err := cm.Snapshotter.Prepare(ctx, id, parentSnapshotID); err != nil {
return nil, errors.Wrapf(err, "failed to prepare %s", id)
}
md, _ := cm.md.Get(id)
rec := &cacheRecord{
mu: &sync.Mutex{},
mutable: true,
cm: cm,
refs: make(map[ref]struct{}),
parent: parent,
md: md,
}
if err := initializeMetadata(rec, parentID, opts...); err != nil {
return nil, err
}
if err := setImageRefMetadata(rec, opts...); err != nil {
return nil, errors.Wrapf(err, "failed to append image ref metadata to ref %s", rec.ID())
}
cm.mu.Lock()
defer cm.mu.Unlock()
cm.records[id] = rec // TODO: save to db
// parent refs are possibly lazy so keep it hold the description handlers.
var dhs DescHandlers
if parent != nil {
dhs = parent.descHandlers
}
return rec.mref(true, dhs), nil
}
func (cm *cacheManager) GetMutable(ctx context.Context, id string, opts ...RefOption) (MutableRef, error) {
cm.mu.Lock()
defer cm.mu.Unlock()
rec, err := cm.getRecord(ctx, id, opts...)
if err != nil {
return nil, err
}
rec.mu.Lock()
defer rec.mu.Unlock()
if !rec.mutable {
return nil, errors.Wrapf(errInvalid, "%s is not mutable", id)
}
if len(rec.refs) != 0 {
return nil, errors.Wrapf(ErrLocked, "%s is locked", id)
}
if rec.equalImmutable != nil {
if len(rec.equalImmutable.refs) != 0 {
return nil, errors.Wrapf(ErrLocked, "%s is locked", id)
}
delete(cm.records, rec.equalImmutable.ID())
if err := rec.equalImmutable.remove(ctx, false); err != nil {
return nil, err
}
rec.equalImmutable = nil
}
return rec.mref(true, descHandlersOf(opts...)), nil
}
func (cm *cacheManager) Prune(ctx context.Context, ch chan client.UsageInfo, opts ...client.PruneInfo) error {
cm.muPrune.Lock()
for _, opt := range opts {
if err := cm.pruneOnce(ctx, ch, opt); err != nil {
cm.muPrune.Unlock()
return err
}
}
cm.muPrune.Unlock()
if cm.GarbageCollect != nil {
if _, err := cm.GarbageCollect(ctx); err != nil {
return err
}
}
return nil
}
func (cm *cacheManager) pruneOnce(ctx context.Context, ch chan client.UsageInfo, opt client.PruneInfo) error {
filter, err := filters.ParseAll(opt.Filter...)
if err != nil {
return errors.Wrapf(err, "failed to parse prune filters %v", opt.Filter)
}
var check ExternalRefChecker
if f := cm.PruneRefChecker; f != nil && (!opt.All || len(opt.Filter) > 0) {
c, err := f()
if err != nil {
return errors.WithStack(err)
}
check = c
}
totalSize := int64(0)
if opt.KeepBytes != 0 {
du, err := cm.DiskUsage(ctx, client.DiskUsageInfo{})
if err != nil {
return err
}
for _, ui := range du {
if ui.Shared {
continue
}
totalSize += ui.Size
}
}
return cm.prune(ctx, ch, pruneOpt{
filter: filter,
all: opt.All,
checkShared: check,
keepDuration: opt.KeepDuration,
keepBytes: opt.KeepBytes,
totalSize: totalSize,
})
}
func (cm *cacheManager) prune(ctx context.Context, ch chan client.UsageInfo, opt pruneOpt) error {
var toDelete []*deleteRecord
if opt.keepBytes != 0 && opt.totalSize < opt.keepBytes {
return nil
}
cm.mu.Lock()
gcMode := opt.keepBytes != 0
cutOff := time.Now().Add(-opt.keepDuration)
locked := map[*sync.Mutex]struct{}{}
for _, cr := range cm.records {
if _, ok := locked[cr.mu]; ok {
continue
}
cr.mu.Lock()
// ignore duplicates that share data
if cr.equalImmutable != nil && len(cr.equalImmutable.refs) > 0 || cr.equalMutable != nil && len(cr.refs) == 0 {
cr.mu.Unlock()
continue
}
if cr.isDead() {
cr.mu.Unlock()
continue
}
if len(cr.refs) == 0 {
recordType := GetRecordType(cr)
if recordType == "" {
recordType = client.UsageRecordTypeRegular
}
shared := false
if opt.checkShared != nil {
shared = opt.checkShared.Exists(cr.ID(), cr.parentChain())
}
if !opt.all {
if recordType == client.UsageRecordTypeInternal || recordType == client.UsageRecordTypeFrontend || shared {
cr.mu.Unlock()
continue
}
}
c := &client.UsageInfo{
ID: cr.ID(),
Mutable: cr.mutable,
RecordType: recordType,
Shared: shared,
}
usageCount, lastUsedAt := getLastUsed(cr.md)
c.LastUsedAt = lastUsedAt
c.UsageCount = usageCount
if opt.keepDuration != 0 {
if lastUsedAt != nil && lastUsedAt.After(cutOff) {
cr.mu.Unlock()
continue
}
}
if opt.filter.Match(adaptUsageInfo(c)) {
toDelete = append(toDelete, &deleteRecord{
cacheRecord: cr,
lastUsedAt: c.LastUsedAt,
usageCount: c.UsageCount,
})
if !gcMode {
cr.dead = true
// mark metadata as deleted in case we crash before cleanup finished
if err := setDeleted(cr.md); err != nil {
cr.mu.Unlock()
cm.mu.Unlock()
return err
}
} else {
locked[cr.mu] = struct{}{}
continue // leave the record locked
}
}
}
cr.mu.Unlock()
}
if gcMode && len(toDelete) > 0 {
sortDeleteRecords(toDelete)
var err error
for i, cr := range toDelete {
// only remove single record at a time
if i == 0 {
cr.dead = true
err = setDeleted(cr.md)
}
cr.mu.Unlock()
}
if err != nil {
return err
}
toDelete = toDelete[:1]
}
cm.mu.Unlock()
if len(toDelete) == 0 {
return nil
}
var err error
for _, cr := range toDelete {
cr.mu.Lock()
usageCount, lastUsedAt := getLastUsed(cr.md)
c := client.UsageInfo{
ID: cr.ID(),
Mutable: cr.mutable,
InUse: len(cr.refs) > 0,
Size: getSize(cr.md),
CreatedAt: GetCreatedAt(cr.md),
Description: GetDescription(cr.md),
LastUsedAt: lastUsedAt,
UsageCount: usageCount,
}
if cr.parent != nil {
c.Parent = cr.parent.ID()
}
if c.Size == sizeUnknown && cr.equalImmutable != nil {
c.Size = getSize(cr.equalImmutable.md) // benefit from DiskUsage calc
}
if c.Size == sizeUnknown {
cr.mu.Unlock() // all the non-prune modifications already protected by cr.dead
s, err := cr.Size(ctx)
if err != nil {
return err
}
c.Size = s
cr.mu.Lock()
}
opt.totalSize -= c.Size
if cr.equalImmutable != nil {
if err1 := cr.equalImmutable.remove(ctx, false); err == nil {
err = err1
}
}
if err1 := cr.remove(ctx, true); err == nil {
err = err1
}
if err == nil && ch != nil {
ch <- c
}
cr.mu.Unlock()
}
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
default:
return cm.prune(ctx, ch, opt)
}
}
func (cm *cacheManager) markShared(m map[string]*cacheUsageInfo) error {
if cm.PruneRefChecker == nil {
return nil
}
c, err := cm.PruneRefChecker()
if err != nil {
return errors.WithStack(err)
}
var markAllParentsShared func(string)
markAllParentsShared = func(id string) {
if v, ok := m[id]; ok {
v.shared = true
if v.parent != "" {
markAllParentsShared(v.parent)
}
}
}
for id := range m {
if m[id].shared {
continue
}
if b := c.Exists(id, m[id].parentChain); b {
markAllParentsShared(id)
}
}
return nil
}
type cacheUsageInfo struct {
refs int
parent string
size int64
mutable bool
createdAt time.Time
usageCount int
lastUsedAt *time.Time
description string
doubleRef bool
recordType client.UsageRecordType
shared bool
parentChain []digest.Digest
}
func (cm *cacheManager) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) {
filter, err := filters.ParseAll(opt.Filter...)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse diskusage filters %v", opt.Filter)
}
cm.mu.Lock()
m := make(map[string]*cacheUsageInfo, len(cm.records))
rescan := make(map[string]struct{}, len(cm.records))
for id, cr := range cm.records {
cr.mu.Lock()
// ignore duplicates that share data
if cr.equalImmutable != nil && len(cr.equalImmutable.refs) > 0 || cr.equalMutable != nil && len(cr.refs) == 0 {
cr.mu.Unlock()
continue
}
usageCount, lastUsedAt := getLastUsed(cr.md)
c := &cacheUsageInfo{
refs: len(cr.refs),
mutable: cr.mutable,
size: getSize(cr.md),
createdAt: GetCreatedAt(cr.md),
usageCount: usageCount,
lastUsedAt: lastUsedAt,
description: GetDescription(cr.md),
doubleRef: cr.equalImmutable != nil,
recordType: GetRecordType(cr),
parentChain: cr.parentChain(),
}
if c.recordType == "" {
c.recordType = client.UsageRecordTypeRegular
}
if cr.parent != nil {
c.parent = cr.parent.ID()
}
if cr.mutable && c.refs > 0 {
c.size = 0 // size can not be determined because it is changing
}
m[id] = c
rescan[id] = struct{}{}
cr.mu.Unlock()
}
cm.mu.Unlock()
for {
if len(rescan) == 0 {
break
}
for id := range rescan {
v := m[id]
if v.refs == 0 && v.parent != "" {
m[v.parent].refs--
if v.doubleRef {
m[v.parent].refs--
}
rescan[v.parent] = struct{}{}
}
delete(rescan, id)
}
}
if err := cm.markShared(m); err != nil {
return nil, err
}
var du []*client.UsageInfo
for id, cr := range m {
c := &client.UsageInfo{
ID: id,
Mutable: cr.mutable,
InUse: cr.refs > 0,
Size: cr.size,
Parent: cr.parent,
CreatedAt: cr.createdAt,
Description: cr.description,
LastUsedAt: cr.lastUsedAt,
UsageCount: cr.usageCount,
RecordType: cr.recordType,
Shared: cr.shared,
}
if filter.Match(adaptUsageInfo(c)) {
du = append(du, c)
}
}
eg, ctx := errgroup.WithContext(ctx)
for _, d := range du {
if d.Size == sizeUnknown {
func(d *client.UsageInfo) {
eg.Go(func() error {
ref, err := cm.Get(ctx, d.ID, NoUpdateLastUsed)
if err != nil {
d.Size = 0
return nil
}
s, err := ref.Size(ctx)
if err != nil {
return err
}
d.Size = s
return ref.Release(context.TODO())
})
}(d)
}
}
if err := eg.Wait(); err != nil {
return du, err
}
return du, nil
}
func IsNotFound(err error) bool {
return errors.Is(err, errNotFound)
}
type RefOption interface{}
type cachePolicy int
const (
cachePolicyDefault cachePolicy = iota
cachePolicyRetain
)
type withMetadata interface {
Metadata() *metadata.StorageItem
}
type noUpdateLastUsed struct{}
var NoUpdateLastUsed noUpdateLastUsed
func HasCachePolicyRetain(m withMetadata) bool {
return getCachePolicy(m.Metadata()) == cachePolicyRetain
}
func CachePolicyRetain(m withMetadata) error {
return queueCachePolicy(m.Metadata(), cachePolicyRetain)
}
func CachePolicyDefault(m withMetadata) error {
return queueCachePolicy(m.Metadata(), cachePolicyDefault)
}
func WithDescription(descr string) RefOption {
return func(m withMetadata) error {
return queueDescription(m.Metadata(), descr)
}
}
func WithRecordType(t client.UsageRecordType) RefOption {
return func(m withMetadata) error {
return queueRecordType(m.Metadata(), t)
}
}
func WithCreationTime(tm time.Time) RefOption {
return func(m withMetadata) error {
return queueCreatedAt(m.Metadata(), tm)
}
}
// Need a separate type for imageRef because it needs to be called outside
// initializeMetadata while still being a RefOption, so wrapping it in a
// different type ensures initializeMetadata won't catch it too and duplicate
// setting the metadata.
type imageRefOption func(m withMetadata) error
// WithImageRef appends the given imageRef to the cache ref's metadata
func WithImageRef(imageRef string) RefOption {
return imageRefOption(func(m withMetadata) error {
return appendImageRef(m.Metadata(), imageRef)
})
}
func setImageRefMetadata(m withMetadata, opts ...RefOption) error {
md := m.Metadata()
for _, opt := range opts {
if fn, ok := opt.(imageRefOption); ok {
if err := fn(m); err != nil {
return err
}
}
}
return md.Commit()
}
func initializeMetadata(m withMetadata, parent string, opts ...RefOption) error {
md := m.Metadata()
if tm := GetCreatedAt(md); !tm.IsZero() {
return nil
}
if err := queueParent(md, parent); err != nil {
return err
}
if err := queueCreatedAt(md, time.Now()); err != nil {
return err
}
for _, opt := range opts {
if fn, ok := opt.(func(withMetadata) error); ok {
if err := fn(m); err != nil {
return err
}
}
}
return md.Commit()
}
func adaptUsageInfo(info *client.UsageInfo) filters.Adaptor {
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
if len(fieldpath) == 0 {
return "", false
}
switch fieldpath[0] {
case "id":
return info.ID, info.ID != ""
case "parent":
return info.Parent, info.Parent != ""
case "description":
return info.Description, info.Description != ""
case "inuse":
return "", info.InUse
case "mutable":
return "", info.Mutable
case "immutable":
return "", !info.Mutable
case "type":
return string(info.RecordType), info.RecordType != ""
case "shared":
return "", info.Shared
case "private":
return "", !info.Shared
}
// TODO: add int/datetime/bytes support for more fields
return "", false
})
}
type pruneOpt struct {
filter filters.Filter
all bool
checkShared ExternalRefChecker
keepDuration time.Duration
keepBytes int64
totalSize int64
}
type deleteRecord struct {
*cacheRecord
lastUsedAt *time.Time
usageCount int
lastUsedAtIndex int
usageCountIndex int
}
func sortDeleteRecords(toDelete []*deleteRecord) {
sort.Slice(toDelete, func(i, j int) bool {
if toDelete[i].lastUsedAt == nil {
return true
}
if toDelete[j].lastUsedAt == nil {
return false
}
return toDelete[i].lastUsedAt.Before(*toDelete[j].lastUsedAt)
})
maxLastUsedIndex := 0
var val time.Time
for _, v := range toDelete {
if v.lastUsedAt != nil && v.lastUsedAt.After(val) {
val = *v.lastUsedAt
maxLastUsedIndex++
}
v.lastUsedAtIndex = maxLastUsedIndex
}
sort.Slice(toDelete, func(i, j int) bool {
return toDelete[i].usageCount < toDelete[j].usageCount
})
maxUsageCountIndex := 0
var count int
for _, v := range toDelete {
if v.usageCount != count {
count = v.usageCount
maxUsageCountIndex++
}
v.usageCountIndex = maxUsageCountIndex
}
sort.Slice(toDelete, func(i, j int) bool {
return float64(toDelete[i].lastUsedAtIndex)/float64(maxLastUsedIndex)+
float64(toDelete[i].usageCountIndex)/float64(maxUsageCountIndex) <
float64(toDelete[j].lastUsedAtIndex)/float64(maxLastUsedIndex)+
float64(toDelete[j].usageCountIndex)/float64(maxUsageCountIndex)
})
}
func diffIDFromDescriptor(desc ocispec.Descriptor) (digest.Digest, error) {
diffIDStr, ok := desc.Annotations["containerd.io/uncompressed"]
if !ok {
return "", errors.Errorf("missing uncompressed annotation for %s", desc.Digest)
}
diffID, err := digest.Parse(diffIDStr)
if err != nil {
return "", errors.Wrapf(err, "failed to parse diffID %q for %s", diffIDStr, desc.Digest)
}
return diffID, nil
}