blob: 05064fdec53fc3a40526f7efa7fbfa82c3c79a7b [file] [log] [blame]
package metadata
import (
"context"
"encoding/binary"
"strings"
"sync"
"time"
"github.com/boltdb/bolt"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/labels"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata/boltutil"
"github.com/containerd/containerd/namespaces"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
type contentStore struct {
content.Store
db *DB
l sync.RWMutex
}
// newContentStore returns a namespaced content store using an existing
// content store interface.
func newContentStore(db *DB, cs content.Store) *contentStore {
return &contentStore{
Store: cs,
db: db,
}
}
func (cs *contentStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return content.Info{}, err
}
var info content.Info
if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
bkt := getBlobBucket(tx, ns, dgst)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst)
}
info.Digest = dgst
return readInfo(&info, bkt)
}); err != nil {
return content.Info{}, err
}
return info, nil
}
func (cs *contentStore) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return content.Info{}, err
}
cs.l.RLock()
defer cs.l.RUnlock()
updated := content.Info{
Digest: info.Digest,
}
if err := update(ctx, cs.db, func(tx *bolt.Tx) error {
bkt := getBlobBucket(tx, ns, info.Digest)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", info.Digest)
}
if err := readInfo(&updated, bkt); err != nil {
return errors.Wrapf(err, "info %q", info.Digest)
}
if len(fieldpaths) > 0 {
for _, path := range fieldpaths {
if strings.HasPrefix(path, "labels.") {
if updated.Labels == nil {
updated.Labels = map[string]string{}
}
key := strings.TrimPrefix(path, "labels.")
updated.Labels[key] = info.Labels[key]
continue
}
switch path {
case "labels":
updated.Labels = info.Labels
default:
return errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on content info %q", path, info.Digest)
}
}
} else {
// Set mutable fields
updated.Labels = info.Labels
}
if err := validateInfo(&updated); err != nil {
return err
}
updated.UpdatedAt = time.Now().UTC()
return writeInfo(&updated, bkt)
}); err != nil {
return content.Info{}, err
}
return updated, nil
}
func (cs *contentStore) Walk(ctx context.Context, fn content.WalkFunc, fs ...string) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
filter, err := filters.ParseAll(fs...)
if err != nil {
return err
}
// TODO: Batch results to keep from reading all info into memory
var infos []content.Info
if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
bkt := getBlobsBucket(tx, ns)
if bkt == nil {
return nil
}
return bkt.ForEach(func(k, v []byte) error {
dgst, err := digest.Parse(string(k))
if err != nil {
// Not a digest, skip
return nil
}
bbkt := bkt.Bucket(k)
if bbkt == nil {
return nil
}
info := content.Info{
Digest: dgst,
}
if err := readInfo(&info, bkt.Bucket(k)); err != nil {
return err
}
if filter.Match(adaptContentInfo(info)) {
infos = append(infos, info)
}
return nil
})
}); err != nil {
return err
}
for _, info := range infos {
if err := fn(info); err != nil {
return err
}
}
return nil
}
func (cs *contentStore) Delete(ctx context.Context, dgst digest.Digest) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
cs.l.RLock()
defer cs.l.RUnlock()
return update(ctx, cs.db, func(tx *bolt.Tx) error {
bkt := getBlobBucket(tx, ns, dgst)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst)
}
if err := getBlobsBucket(tx, ns).DeleteBucket([]byte(dgst.String())); err != nil {
return err
}
// Mark content store as dirty for triggering garbage collection
cs.db.dirtyL.Lock()
cs.db.dirtyCS = true
cs.db.dirtyL.Unlock()
return nil
})
}
func (cs *contentStore) ListStatuses(ctx context.Context, fs ...string) ([]content.Status, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
filter, err := filters.ParseAll(fs...)
if err != nil {
return nil, err
}
brefs := map[string]string{}
if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
bkt := getIngestBucket(tx, ns)
if bkt == nil {
return nil
}
return bkt.ForEach(func(k, v []byte) error {
// TODO(dmcgowan): match name and potentially labels here
brefs[string(k)] = string(v)
return nil
})
}); err != nil {
return nil, err
}
statuses := make([]content.Status, 0, len(brefs))
for k, bref := range brefs {
status, err := cs.Store.Status(ctx, bref)
if err != nil {
if errdefs.IsNotFound(err) {
continue
}
return nil, err
}
status.Ref = k
if filter.Match(adaptContentStatus(status)) {
statuses = append(statuses, status)
}
}
return statuses, nil
}
func getRef(tx *bolt.Tx, ns, ref string) string {
bkt := getIngestBucket(tx, ns)
if bkt == nil {
return ""
}
v := bkt.Get([]byte(ref))
if len(v) == 0 {
return ""
}
return string(v)
}
func (cs *contentStore) Status(ctx context.Context, ref string) (content.Status, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return content.Status{}, err
}
var bref string
if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
bref = getRef(tx, ns, ref)
if bref == "" {
return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref)
}
return nil
}); err != nil {
return content.Status{}, err
}
st, err := cs.Store.Status(ctx, bref)
if err != nil {
return content.Status{}, err
}
st.Ref = ref
return st, nil
}
func (cs *contentStore) Abort(ctx context.Context, ref string) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
cs.l.RLock()
defer cs.l.RUnlock()
return update(ctx, cs.db, func(tx *bolt.Tx) error {
bkt := getIngestBucket(tx, ns)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref)
}
bref := string(bkt.Get([]byte(ref)))
if bref == "" {
return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref)
}
if err := bkt.Delete([]byte(ref)); err != nil {
return err
}
return cs.Store.Abort(ctx, bref)
})
}
func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
cs.l.RLock()
defer cs.l.RUnlock()
var w content.Writer
if err := update(ctx, cs.db, func(tx *bolt.Tx) error {
if expected != "" {
cbkt := getBlobBucket(tx, ns, expected)
if cbkt != nil {
return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected)
}
}
bkt, err := createIngestBucket(tx, ns)
if err != nil {
return err
}
var (
bref string
brefb = bkt.Get([]byte(ref))
)
if brefb == nil {
sid, err := bkt.NextSequence()
if err != nil {
return err
}
bref = createKey(sid, ns, ref)
if err := bkt.Put([]byte(ref), []byte(bref)); err != nil {
return err
}
} else {
bref = string(brefb)
}
// Do not use the passed in expected value here since it was
// already checked against the user metadata. If the content
// store has the content, it must still be written before
// linked into the given namespace. It is possible in the future
// to allow content which exists in content store but not
// namespace to be linked here and returned an exist error, but
// this would require more configuration to make secure.
w, err = cs.Store.Writer(ctx, bref, size, "")
return err
}); err != nil {
return nil, err
}
// TODO: keep the expected in the writer to use on commit
// when no expected is provided there.
return &namespacedWriter{
Writer: w,
ref: ref,
namespace: ns,
db: cs.db,
l: &cs.l,
}, nil
}
type namespacedWriter struct {
content.Writer
ref string
namespace string
db transactor
l *sync.RWMutex
}
func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
nw.l.RLock()
defer nw.l.RUnlock()
return update(ctx, nw.db, func(tx *bolt.Tx) error {
bkt := getIngestBucket(tx, nw.namespace)
if bkt != nil {
if err := bkt.Delete([]byte(nw.ref)); err != nil {
return err
}
}
return nw.commit(ctx, tx, size, expected, opts...)
})
}
func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, expected digest.Digest, opts ...content.Opt) error {
var base content.Info
for _, opt := range opts {
if err := opt(&base); err != nil {
return err
}
}
if err := validateInfo(&base); err != nil {
return err
}
status, err := nw.Writer.Status()
if err != nil {
return err
}
if size != 0 && size != status.Offset {
return errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size)
}
size = status.Offset
actual := nw.Writer.Digest()
if err := nw.Writer.Commit(ctx, size, expected); err != nil {
if !errdefs.IsAlreadyExists(err) {
return err
}
if getBlobBucket(tx, nw.namespace, actual) != nil {
return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
}
}
bkt, err := createBlobBucket(tx, nw.namespace, actual)
if err != nil {
return err
}
commitTime := time.Now().UTC()
sizeEncoded, err := encodeInt(size)
if err != nil {
return err
}
if err := boltutil.WriteTimestamps(bkt, commitTime, commitTime); err != nil {
return err
}
if err := boltutil.WriteLabels(bkt, base.Labels); err != nil {
return err
}
return bkt.Put(bucketKeySize, sizeEncoded)
}
func (nw *namespacedWriter) Status() (content.Status, error) {
st, err := nw.Writer.Status()
if err == nil {
st.Ref = nw.ref
}
return st, err
}
func (cs *contentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) {
if err := cs.checkAccess(ctx, dgst); err != nil {
return nil, err
}
return cs.Store.ReaderAt(ctx, dgst)
}
func (cs *contentStore) checkAccess(ctx context.Context, dgst digest.Digest) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
return view(ctx, cs.db, func(tx *bolt.Tx) error {
bkt := getBlobBucket(tx, ns, dgst)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst)
}
return nil
})
}
func validateInfo(info *content.Info) error {
for k, v := range info.Labels {
if err := labels.Validate(k, v); err == nil {
return errors.Wrapf(err, "info.Labels")
}
}
return nil
}
func readInfo(info *content.Info, bkt *bolt.Bucket) error {
if err := boltutil.ReadTimestamps(bkt, &info.CreatedAt, &info.UpdatedAt); err != nil {
return err
}
labels, err := boltutil.ReadLabels(bkt)
if err != nil {
return err
}
info.Labels = labels
if v := bkt.Get(bucketKeySize); len(v) > 0 {
info.Size, _ = binary.Varint(v)
}
return nil
}
func writeInfo(info *content.Info, bkt *bolt.Bucket) error {
if err := boltutil.WriteTimestamps(bkt, info.CreatedAt, info.UpdatedAt); err != nil {
return err
}
if err := boltutil.WriteLabels(bkt, info.Labels); err != nil {
return errors.Wrapf(err, "writing labels for info %v", info.Digest)
}
// Write size
sizeEncoded, err := encodeInt(info.Size)
if err != nil {
return err
}
return bkt.Put(bucketKeySize, sizeEncoded)
}
func (cs *contentStore) garbageCollect(ctx context.Context) error {
lt1 := time.Now()
cs.l.Lock()
defer func() {
cs.l.Unlock()
log.G(ctx).WithField("t", time.Now().Sub(lt1)).Debugf("content garbage collected")
}()
seen := map[string]struct{}{}
if err := cs.db.View(func(tx *bolt.Tx) error {
v1bkt := tx.Bucket(bucketKeyVersion)
if v1bkt == nil {
return nil
}
// iterate through each namespace
v1c := v1bkt.Cursor()
for k, v := v1c.First(); k != nil; k, v = v1c.Next() {
if v != nil {
continue
}
cbkt := v1bkt.Bucket(k).Bucket(bucketKeyObjectContent)
if cbkt == nil {
continue
}
bbkt := cbkt.Bucket(bucketKeyObjectBlob)
if err := bbkt.ForEach(func(ck, cv []byte) error {
if cv == nil {
seen[string(ck)] = struct{}{}
}
return nil
}); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
if err := cs.Store.Walk(ctx, func(info content.Info) error {
if _, ok := seen[info.Digest.String()]; !ok {
if err := cs.Store.Delete(ctx, info.Digest); err != nil {
return err
}
log.G(ctx).WithField("digest", info.Digest).Debug("removed content")
}
return nil
}); err != nil {
return err
}
return nil
}