| package metadata |
| |
| import ( |
| "context" |
| "encoding/binary" |
| "strings" |
| "sync" |
| "time" |
| |
| "github.com/boltdb/bolt" |
| "github.com/containerd/containerd/content" |
| "github.com/containerd/containerd/errdefs" |
| "github.com/containerd/containerd/filters" |
| "github.com/containerd/containerd/labels" |
| "github.com/containerd/containerd/log" |
| "github.com/containerd/containerd/metadata/boltutil" |
| "github.com/containerd/containerd/namespaces" |
| digest "github.com/opencontainers/go-digest" |
| "github.com/pkg/errors" |
| ) |
| |
| type contentStore struct { |
| content.Store |
| db *DB |
| l sync.RWMutex |
| } |
| |
| // newContentStore returns a namespaced content store using an existing |
| // content store interface. |
| func newContentStore(db *DB, cs content.Store) *contentStore { |
| return &contentStore{ |
| Store: cs, |
| db: db, |
| } |
| } |
| |
| func (cs *contentStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { |
| ns, err := namespaces.NamespaceRequired(ctx) |
| if err != nil { |
| return content.Info{}, err |
| } |
| |
| var info content.Info |
| if err := view(ctx, cs.db, func(tx *bolt.Tx) error { |
| bkt := getBlobBucket(tx, ns, dgst) |
| if bkt == nil { |
| return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst) |
| } |
| |
| info.Digest = dgst |
| return readInfo(&info, bkt) |
| }); err != nil { |
| return content.Info{}, err |
| } |
| |
| return info, nil |
| } |
| |
| func (cs *contentStore) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) { |
| ns, err := namespaces.NamespaceRequired(ctx) |
| if err != nil { |
| return content.Info{}, err |
| } |
| |
| cs.l.RLock() |
| defer cs.l.RUnlock() |
| |
| updated := content.Info{ |
| Digest: info.Digest, |
| } |
| if err := update(ctx, cs.db, func(tx *bolt.Tx) error { |
| bkt := getBlobBucket(tx, ns, info.Digest) |
| if bkt == nil { |
| return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", info.Digest) |
| } |
| |
| if err := readInfo(&updated, bkt); err != nil { |
| return errors.Wrapf(err, "info %q", info.Digest) |
| } |
| |
| if len(fieldpaths) > 0 { |
| for _, path := range fieldpaths { |
| if strings.HasPrefix(path, "labels.") { |
| if updated.Labels == nil { |
| updated.Labels = map[string]string{} |
| } |
| |
| key := strings.TrimPrefix(path, "labels.") |
| updated.Labels[key] = info.Labels[key] |
| continue |
| } |
| |
| switch path { |
| case "labels": |
| updated.Labels = info.Labels |
| default: |
| return errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on content info %q", path, info.Digest) |
| } |
| } |
| } else { |
| // Set mutable fields |
| updated.Labels = info.Labels |
| } |
| if err := validateInfo(&updated); err != nil { |
| return err |
| } |
| |
| updated.UpdatedAt = time.Now().UTC() |
| return writeInfo(&updated, bkt) |
| }); err != nil { |
| return content.Info{}, err |
| } |
| return updated, nil |
| } |
| |
| func (cs *contentStore) Walk(ctx context.Context, fn content.WalkFunc, fs ...string) error { |
| ns, err := namespaces.NamespaceRequired(ctx) |
| if err != nil { |
| return err |
| } |
| |
| filter, err := filters.ParseAll(fs...) |
| if err != nil { |
| return err |
| } |
| |
| // TODO: Batch results to keep from reading all info into memory |
| var infos []content.Info |
| if err := view(ctx, cs.db, func(tx *bolt.Tx) error { |
| bkt := getBlobsBucket(tx, ns) |
| if bkt == nil { |
| return nil |
| } |
| |
| return bkt.ForEach(func(k, v []byte) error { |
| dgst, err := digest.Parse(string(k)) |
| if err != nil { |
| // Not a digest, skip |
| return nil |
| } |
| bbkt := bkt.Bucket(k) |
| if bbkt == nil { |
| return nil |
| } |
| info := content.Info{ |
| Digest: dgst, |
| } |
| if err := readInfo(&info, bkt.Bucket(k)); err != nil { |
| return err |
| } |
| if filter.Match(adaptContentInfo(info)) { |
| infos = append(infos, info) |
| } |
| return nil |
| }) |
| }); err != nil { |
| return err |
| } |
| |
| for _, info := range infos { |
| if err := fn(info); err != nil { |
| return err |
| } |
| } |
| |
| return nil |
| } |
| |
| func (cs *contentStore) Delete(ctx context.Context, dgst digest.Digest) error { |
| ns, err := namespaces.NamespaceRequired(ctx) |
| if err != nil { |
| return err |
| } |
| |
| cs.l.RLock() |
| defer cs.l.RUnlock() |
| |
| return update(ctx, cs.db, func(tx *bolt.Tx) error { |
| bkt := getBlobBucket(tx, ns, dgst) |
| if bkt == nil { |
| return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst) |
| } |
| |
| if err := getBlobsBucket(tx, ns).DeleteBucket([]byte(dgst.String())); err != nil { |
| return err |
| } |
| |
| // Mark content store as dirty for triggering garbage collection |
| cs.db.dirtyL.Lock() |
| cs.db.dirtyCS = true |
| cs.db.dirtyL.Unlock() |
| |
| return nil |
| }) |
| } |
| |
| func (cs *contentStore) ListStatuses(ctx context.Context, fs ...string) ([]content.Status, error) { |
| ns, err := namespaces.NamespaceRequired(ctx) |
| if err != nil { |
| return nil, err |
| } |
| |
| filter, err := filters.ParseAll(fs...) |
| if err != nil { |
| return nil, err |
| } |
| |
| brefs := map[string]string{} |
| if err := view(ctx, cs.db, func(tx *bolt.Tx) error { |
| bkt := getIngestBucket(tx, ns) |
| if bkt == nil { |
| return nil |
| } |
| |
| return bkt.ForEach(func(k, v []byte) error { |
| // TODO(dmcgowan): match name and potentially labels here |
| brefs[string(k)] = string(v) |
| return nil |
| }) |
| }); err != nil { |
| return nil, err |
| } |
| |
| statuses := make([]content.Status, 0, len(brefs)) |
| for k, bref := range brefs { |
| status, err := cs.Store.Status(ctx, bref) |
| if err != nil { |
| if errdefs.IsNotFound(err) { |
| continue |
| } |
| return nil, err |
| } |
| status.Ref = k |
| |
| if filter.Match(adaptContentStatus(status)) { |
| statuses = append(statuses, status) |
| } |
| } |
| |
| return statuses, nil |
| |
| } |
| |
| func getRef(tx *bolt.Tx, ns, ref string) string { |
| bkt := getIngestBucket(tx, ns) |
| if bkt == nil { |
| return "" |
| } |
| v := bkt.Get([]byte(ref)) |
| if len(v) == 0 { |
| return "" |
| } |
| return string(v) |
| } |
| |
| func (cs *contentStore) Status(ctx context.Context, ref string) (content.Status, error) { |
| ns, err := namespaces.NamespaceRequired(ctx) |
| if err != nil { |
| return content.Status{}, err |
| } |
| |
| var bref string |
| if err := view(ctx, cs.db, func(tx *bolt.Tx) error { |
| bref = getRef(tx, ns, ref) |
| if bref == "" { |
| return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref) |
| } |
| |
| return nil |
| }); err != nil { |
| return content.Status{}, err |
| } |
| |
| st, err := cs.Store.Status(ctx, bref) |
| if err != nil { |
| return content.Status{}, err |
| } |
| st.Ref = ref |
| return st, nil |
| } |
| |
| func (cs *contentStore) Abort(ctx context.Context, ref string) error { |
| ns, err := namespaces.NamespaceRequired(ctx) |
| if err != nil { |
| return err |
| } |
| |
| cs.l.RLock() |
| defer cs.l.RUnlock() |
| |
| return update(ctx, cs.db, func(tx *bolt.Tx) error { |
| bkt := getIngestBucket(tx, ns) |
| if bkt == nil { |
| return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref) |
| } |
| bref := string(bkt.Get([]byte(ref))) |
| if bref == "" { |
| return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref) |
| } |
| if err := bkt.Delete([]byte(ref)); err != nil { |
| return err |
| } |
| |
| return cs.Store.Abort(ctx, bref) |
| }) |
| |
| } |
| |
| func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) { |
| ns, err := namespaces.NamespaceRequired(ctx) |
| if err != nil { |
| return nil, err |
| } |
| |
| cs.l.RLock() |
| defer cs.l.RUnlock() |
| |
| var w content.Writer |
| if err := update(ctx, cs.db, func(tx *bolt.Tx) error { |
| if expected != "" { |
| cbkt := getBlobBucket(tx, ns, expected) |
| if cbkt != nil { |
| return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected) |
| } |
| } |
| |
| bkt, err := createIngestBucket(tx, ns) |
| if err != nil { |
| return err |
| } |
| |
| var ( |
| bref string |
| brefb = bkt.Get([]byte(ref)) |
| ) |
| |
| if brefb == nil { |
| sid, err := bkt.NextSequence() |
| if err != nil { |
| return err |
| } |
| |
| bref = createKey(sid, ns, ref) |
| if err := bkt.Put([]byte(ref), []byte(bref)); err != nil { |
| return err |
| } |
| } else { |
| bref = string(brefb) |
| } |
| |
| // Do not use the passed in expected value here since it was |
| // already checked against the user metadata. If the content |
| // store has the content, it must still be written before |
| // linked into the given namespace. It is possible in the future |
| // to allow content which exists in content store but not |
| // namespace to be linked here and returned an exist error, but |
| // this would require more configuration to make secure. |
| w, err = cs.Store.Writer(ctx, bref, size, "") |
| return err |
| }); err != nil { |
| return nil, err |
| } |
| |
| // TODO: keep the expected in the writer to use on commit |
| // when no expected is provided there. |
| return &namespacedWriter{ |
| Writer: w, |
| ref: ref, |
| namespace: ns, |
| db: cs.db, |
| l: &cs.l, |
| }, nil |
| } |
| |
| type namespacedWriter struct { |
| content.Writer |
| ref string |
| namespace string |
| db transactor |
| l *sync.RWMutex |
| } |
| |
| func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { |
| nw.l.RLock() |
| defer nw.l.RUnlock() |
| |
| return update(ctx, nw.db, func(tx *bolt.Tx) error { |
| bkt := getIngestBucket(tx, nw.namespace) |
| if bkt != nil { |
| if err := bkt.Delete([]byte(nw.ref)); err != nil { |
| return err |
| } |
| } |
| return nw.commit(ctx, tx, size, expected, opts...) |
| }) |
| } |
| |
| func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, expected digest.Digest, opts ...content.Opt) error { |
| var base content.Info |
| for _, opt := range opts { |
| if err := opt(&base); err != nil { |
| return err |
| } |
| } |
| if err := validateInfo(&base); err != nil { |
| return err |
| } |
| |
| status, err := nw.Writer.Status() |
| if err != nil { |
| return err |
| } |
| if size != 0 && size != status.Offset { |
| return errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size) |
| } |
| size = status.Offset |
| |
| actual := nw.Writer.Digest() |
| |
| if err := nw.Writer.Commit(ctx, size, expected); err != nil { |
| if !errdefs.IsAlreadyExists(err) { |
| return err |
| } |
| if getBlobBucket(tx, nw.namespace, actual) != nil { |
| return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual) |
| } |
| } |
| |
| bkt, err := createBlobBucket(tx, nw.namespace, actual) |
| if err != nil { |
| return err |
| } |
| |
| commitTime := time.Now().UTC() |
| |
| sizeEncoded, err := encodeInt(size) |
| if err != nil { |
| return err |
| } |
| |
| if err := boltutil.WriteTimestamps(bkt, commitTime, commitTime); err != nil { |
| return err |
| } |
| if err := boltutil.WriteLabels(bkt, base.Labels); err != nil { |
| return err |
| } |
| return bkt.Put(bucketKeySize, sizeEncoded) |
| } |
| |
| func (nw *namespacedWriter) Status() (content.Status, error) { |
| st, err := nw.Writer.Status() |
| if err == nil { |
| st.Ref = nw.ref |
| } |
| return st, err |
| } |
| |
| func (cs *contentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { |
| if err := cs.checkAccess(ctx, dgst); err != nil { |
| return nil, err |
| } |
| return cs.Store.ReaderAt(ctx, dgst) |
| } |
| |
| func (cs *contentStore) checkAccess(ctx context.Context, dgst digest.Digest) error { |
| ns, err := namespaces.NamespaceRequired(ctx) |
| if err != nil { |
| return err |
| } |
| |
| return view(ctx, cs.db, func(tx *bolt.Tx) error { |
| bkt := getBlobBucket(tx, ns, dgst) |
| if bkt == nil { |
| return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst) |
| } |
| return nil |
| }) |
| } |
| |
| func validateInfo(info *content.Info) error { |
| for k, v := range info.Labels { |
| if err := labels.Validate(k, v); err == nil { |
| return errors.Wrapf(err, "info.Labels") |
| } |
| } |
| |
| return nil |
| } |
| |
| func readInfo(info *content.Info, bkt *bolt.Bucket) error { |
| if err := boltutil.ReadTimestamps(bkt, &info.CreatedAt, &info.UpdatedAt); err != nil { |
| return err |
| } |
| |
| labels, err := boltutil.ReadLabels(bkt) |
| if err != nil { |
| return err |
| } |
| info.Labels = labels |
| |
| if v := bkt.Get(bucketKeySize); len(v) > 0 { |
| info.Size, _ = binary.Varint(v) |
| } |
| |
| return nil |
| } |
| |
| func writeInfo(info *content.Info, bkt *bolt.Bucket) error { |
| if err := boltutil.WriteTimestamps(bkt, info.CreatedAt, info.UpdatedAt); err != nil { |
| return err |
| } |
| |
| if err := boltutil.WriteLabels(bkt, info.Labels); err != nil { |
| return errors.Wrapf(err, "writing labels for info %v", info.Digest) |
| } |
| |
| // Write size |
| sizeEncoded, err := encodeInt(info.Size) |
| if err != nil { |
| return err |
| } |
| |
| return bkt.Put(bucketKeySize, sizeEncoded) |
| } |
| |
| func (cs *contentStore) garbageCollect(ctx context.Context) error { |
| lt1 := time.Now() |
| cs.l.Lock() |
| defer func() { |
| cs.l.Unlock() |
| log.G(ctx).WithField("t", time.Now().Sub(lt1)).Debugf("content garbage collected") |
| }() |
| |
| seen := map[string]struct{}{} |
| if err := cs.db.View(func(tx *bolt.Tx) error { |
| v1bkt := tx.Bucket(bucketKeyVersion) |
| if v1bkt == nil { |
| return nil |
| } |
| |
| // iterate through each namespace |
| v1c := v1bkt.Cursor() |
| |
| for k, v := v1c.First(); k != nil; k, v = v1c.Next() { |
| if v != nil { |
| continue |
| } |
| |
| cbkt := v1bkt.Bucket(k).Bucket(bucketKeyObjectContent) |
| if cbkt == nil { |
| continue |
| } |
| bbkt := cbkt.Bucket(bucketKeyObjectBlob) |
| if err := bbkt.ForEach(func(ck, cv []byte) error { |
| if cv == nil { |
| seen[string(ck)] = struct{}{} |
| } |
| return nil |
| }); err != nil { |
| return err |
| } |
| } |
| |
| return nil |
| }); err != nil { |
| return err |
| } |
| |
| if err := cs.Store.Walk(ctx, func(info content.Info) error { |
| if _, ok := seen[info.Digest.String()]; !ok { |
| if err := cs.Store.Delete(ctx, info.Digest); err != nil { |
| return err |
| } |
| log.G(ctx).WithField("digest", info.Digest).Debug("removed content") |
| } |
| return nil |
| }); err != nil { |
| return err |
| } |
| |
| return nil |
| } |