Merge pull request #41607 from cpuguy83/use_head_for_manifest_by_tag

cache manifests on pull
diff --git a/daemon/content.go b/daemon/content.go
new file mode 100644
index 0000000..01d1e23
--- /dev/null
+++ b/daemon/content.go
@@ -0,0 +1,30 @@
+package daemon
+
+import (
+	"os"
+	"path/filepath"
+
+	"github.com/containerd/containerd/content"
+	"github.com/containerd/containerd/content/local"
+	"github.com/containerd/containerd/leases"
+	"github.com/containerd/containerd/metadata"
+	"github.com/pkg/errors"
+	"go.etcd.io/bbolt"
+)
+
+func (d *Daemon) configureLocalContentStore() (content.Store, leases.Manager, error) {
+	if err := os.MkdirAll(filepath.Join(d.root, "content"), 0700); err != nil {
+		return nil, nil, errors.Wrap(err, "error creating dir for content store")
+	}
+	db, err := bbolt.Open(filepath.Join(d.root, "content", "metadata.db"), 0600, nil)
+	if err != nil {
+		return nil, nil, errors.Wrap(err, "error opening bolt db for content metadata store")
+	}
+	cs, err := local.NewStore(filepath.Join(d.root, "content", "data"))
+	if err != nil {
+		return nil, nil, errors.Wrap(err, "error setting up content store")
+	}
+	md := metadata.NewDB(db, cs, nil)
+	d.mdDB = db
+	return md.ContentStore(), metadata.NewLeaseManager(md), nil
+}
diff --git a/daemon/daemon.go b/daemon/daemon.go
index f81f45a..9e3a0fa 100644
--- a/daemon/daemon.go
+++ b/daemon/daemon.go
@@ -20,6 +20,7 @@
 	"time"
 
 	"github.com/docker/docker/pkg/fileutils"
+	"go.etcd.io/bbolt"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/backoff"
 
@@ -129,6 +130,11 @@
 
 	attachmentStore       network.AttachmentStore
 	attachableNetworkLock *locker.Locker
+
+	// This is used for Windows which doesn't currently support running on containerd
+	// It stores metadata for the content store (used for manifest caching)
+	// This needs to be closed on daemon exit
+	mdDB *bbolt.DB
 }
 
 // StoreHosts stores the addresses the daemon is listening on
@@ -1066,10 +1072,7 @@
 
 	d.linkIndex = newLinkIndex()
 
-	// TODO: imageStore, distributionMetadataStore, and ReferenceStore are only
-	// used above to run migration. They could be initialized in ImageService
-	// if migration is called from daemon/images. layerStore might move as well.
-	d.imageService = images.NewImageService(images.ImageServiceConfig{
+	imgSvcConfig := images.ImageServiceConfig{
 		ContainerStore:            d.containers,
 		DistributionMetadataStore: distributionMetadataStore,
 		EventsService:             d.EventsService,
@@ -1081,7 +1084,28 @@
 		ReferenceStore:            rs,
 		RegistryService:           registryService,
 		TrustKey:                  trustKey,
-	})
+		ContentNamespace:          config.ContainerdNamespace,
+	}
+
+	// containerd is not currently supported with Windows.
+	// So sometimes d.containerdCli will be nil
+	// In that case we'll create a local content store... but otherwise we'll use containerd
+	if d.containerdCli != nil {
+		imgSvcConfig.Leases = d.containerdCli.LeasesService()
+		imgSvcConfig.ContentStore = d.containerdCli.ContentStore()
+	} else {
+		cs, lm, err := d.configureLocalContentStore()
+		if err != nil {
+			return nil, err
+		}
+		imgSvcConfig.ContentStore = cs
+		imgSvcConfig.Leases = lm
+	}
+
+	// TODO: imageStore, distributionMetadataStore, and ReferenceStore are only
+	// used above to run migration. They could be initialized in ImageService
+	// if migration is called from daemon/images. layerStore might move as well.
+	d.imageService = images.NewImageService(imgSvcConfig)
 
 	go d.execCommandGC()
 
@@ -1246,6 +1270,10 @@
 		daemon.containerdCli.Close()
 	}
 
+	if daemon.mdDB != nil {
+		daemon.mdDB.Close()
+	}
+
 	return daemon.cleanupMounts()
 }
 
diff --git a/daemon/images/image_pull.go b/daemon/images/image_pull.go
index 4eedfdd..ed9099b 100644
--- a/daemon/images/image_pull.go
+++ b/daemon/images/image_pull.go
@@ -6,6 +6,8 @@
 	"strings"
 	"time"
 
+	"github.com/containerd/containerd/leases"
+	"github.com/containerd/containerd/namespaces"
 	dist "github.com/docker/distribution"
 	"github.com/docker/distribution/reference"
 	"github.com/docker/docker/api/types"
@@ -16,6 +18,7 @@
 	"github.com/docker/docker/registry"
 	digest "github.com/opencontainers/go-digest"
 	specs "github.com/opencontainers/image-spec/specs-go/v1"
+	"github.com/pkg/errors"
 )
 
 // PullImage initiates a pull operation. image is the repository name to pull, and
@@ -65,6 +68,25 @@
 		close(writesDone)
 	}()
 
+	ctx = namespaces.WithNamespace(ctx, i.contentNamespace)
+	// Take out a temporary lease for everything that gets persisted to the content store.
+	// Before the lease is cancelled, any content we want to keep should have it's own lease applied.
+	ctx, done, err := tempLease(ctx, i.leases)
+	if err != nil {
+		return err
+	}
+	defer done(ctx)
+
+	cs := &contentStoreForPull{
+		ContentStore: i.content,
+		leases:       i.leases,
+	}
+	imageStore := &imageStoreForPull{
+		ImageConfigStore: distribution.NewImageConfigStoreFromStore(i.imageStore),
+		ingested:         cs,
+		leases:           i.leases,
+	}
+
 	imagePullConfig := &distribution.ImagePullConfig{
 		Config: distribution.Config{
 			MetaHeaders:      metaHeaders,
@@ -73,7 +95,7 @@
 			RegistryService:  i.registryService,
 			ImageEventLogger: i.LogImageEvent,
 			MetadataStore:    i.distributionMetadataStore,
-			ImageStore:       distribution.NewImageConfigStoreFromStore(i.imageStore),
+			ImageStore:       imageStore,
 			ReferenceStore:   i.referenceStore,
 		},
 		DownloadManager: i.downloadManager,
@@ -81,7 +103,7 @@
 		Platform:        platform,
 	}
 
-	err := distribution.Pull(ctx, ref, imagePullConfig)
+	err = distribution.Pull(ctx, ref, imagePullConfig, cs)
 	close(progressChan)
 	<-writesDone
 	return err
@@ -124,3 +146,29 @@
 	}
 	return repository, confirmedV2, lastError
 }
+
+func tempLease(ctx context.Context, mgr leases.Manager) (context.Context, func(context.Context) error, error) {
+	nop := func(context.Context) error { return nil }
+	_, ok := leases.FromContext(ctx)
+	if ok {
+		return ctx, nop, nil
+	}
+
+	// Use an expiration that ensures the lease is cleaned up at some point if there is a crash, SIGKILL, etc.
+	opts := []leases.Opt{
+		leases.WithRandomID(),
+		leases.WithExpiration(24 * time.Hour),
+		leases.WithLabels(map[string]string{
+			"moby.lease/temporary": time.Now().UTC().Format(time.RFC3339Nano),
+		}),
+	}
+	l, err := mgr.Create(ctx, opts...)
+	if err != nil {
+		return ctx, nop, errors.Wrap(err, "error creating temporary lease")
+	}
+
+	ctx = leases.WithLease(ctx, l.ID)
+	return ctx, func(ctx context.Context) error {
+		return mgr.Delete(ctx, l)
+	}, nil
+}
diff --git a/daemon/images/service.go b/daemon/images/service.go
index 388aa2d..e0297c3 100644
--- a/daemon/images/service.go
+++ b/daemon/images/service.go
@@ -5,6 +5,8 @@
 	"os"
 	"runtime"
 
+	"github.com/containerd/containerd/content"
+	"github.com/containerd/containerd/leases"
 	"github.com/docker/docker/container"
 	daemonevents "github.com/docker/docker/daemon/events"
 	"github.com/docker/docker/distribution"
@@ -42,6 +44,9 @@
 	ReferenceStore            dockerreference.Store
 	RegistryService           registry.Service
 	TrustKey                  libtrust.PrivateKey
+	ContentStore              content.Store
+	Leases                    leases.Manager
+	ContentNamespace          string
 }
 
 // NewImageService returns a new ImageService from a configuration
@@ -54,12 +59,15 @@
 		distributionMetadataStore: config.DistributionMetadataStore,
 		downloadManager:           xfer.NewLayerDownloadManager(config.LayerStores, config.MaxConcurrentDownloads, xfer.WithMaxDownloadAttempts(config.MaxDownloadAttempts)),
 		eventsService:             config.EventsService,
-		imageStore:                config.ImageStore,
+		imageStore:                &imageStoreWithLease{Store: config.ImageStore, leases: config.Leases, ns: config.ContentNamespace},
 		layerStores:               config.LayerStores,
 		referenceStore:            config.ReferenceStore,
 		registryService:           config.RegistryService,
 		trustKey:                  config.TrustKey,
 		uploadManager:             xfer.NewLayerUploadManager(config.MaxConcurrentUploads),
+		leases:                    config.Leases,
+		content:                   config.ContentStore,
+		contentNamespace:          config.ContentNamespace,
 	}
 }
 
@@ -76,6 +84,9 @@
 	registryService           registry.Service
 	trustKey                  libtrust.PrivateKey
 	uploadManager             *xfer.LayerUploadManager
+	leases                    leases.Manager
+	content                   content.Store
+	contentNamespace          string
 }
 
 // DistributionServices provides daemon image storage services
diff --git a/daemon/images/store.go b/daemon/images/store.go
new file mode 100644
index 0000000..2c8d481
--- /dev/null
+++ b/daemon/images/store.go
@@ -0,0 +1,155 @@
+package images
+
+import (
+	"context"
+	"sync"
+
+	"github.com/containerd/containerd/content"
+	c8derrdefs "github.com/containerd/containerd/errdefs"
+	"github.com/containerd/containerd/leases"
+	"github.com/containerd/containerd/log"
+	"github.com/containerd/containerd/namespaces"
+	"github.com/docker/docker/distribution"
+	"github.com/docker/docker/image"
+	"github.com/docker/docker/layer"
+	digest "github.com/opencontainers/go-digest"
+	"github.com/pkg/errors"
+	"github.com/sirupsen/logrus"
+)
+
+func imageKey(dgst digest.Digest) string {
+	return "moby-image-" + dgst.String()
+}
+
+// imageStoreWithLease wraps the configured image store with one that deletes the lease
+// reigstered for a given image ID, if one exists
+//
+// This is used by the main image service to wrap delete calls to the real image store.
+type imageStoreWithLease struct {
+	image.Store
+	leases leases.Manager
+
+	// Normally we'd pass namespace down through a context.Context, however...
+	// The interface for image store doesn't allow this, so we store it here.
+	ns string
+}
+
+func (s *imageStoreWithLease) Delete(id image.ID) ([]layer.Metadata, error) {
+	ctx := namespaces.WithNamespace(context.TODO(), s.ns)
+	if err := s.leases.Delete(ctx, leases.Lease{ID: imageKey(digest.Digest(id))}); err != nil && !c8derrdefs.IsNotFound(err) {
+		return nil, errors.Wrap(err, "error deleting lease")
+	}
+	return s.Store.Delete(id)
+}
+
+// iamgeStoreForPull is created for each pull It wraps an underlying image store
+// to handle registering leases for content fetched in a single image pull.
+type imageStoreForPull struct {
+	distribution.ImageConfigStore
+	leases   leases.Manager
+	ingested *contentStoreForPull
+}
+
+func (s *imageStoreForPull) Put(ctx context.Context, config []byte) (digest.Digest, error) {
+	id, err := s.ImageConfigStore.Put(ctx, config)
+	if err != nil {
+		return "", err
+	}
+	return id, s.updateLease(ctx, id)
+}
+
+func (s *imageStoreForPull) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
+	id, err := s.ImageConfigStore.Get(ctx, dgst)
+	if err != nil {
+		return nil, err
+	}
+	return id, s.updateLease(ctx, dgst)
+}
+
+func (s *imageStoreForPull) updateLease(ctx context.Context, dgst digest.Digest) error {
+	leaseID := imageKey(dgst)
+	lease, err := s.leases.Create(ctx, leases.WithID(leaseID))
+	if err != nil {
+		if !c8derrdefs.IsAlreadyExists(err) {
+			return errors.Wrap(err, "error creating lease")
+		}
+		lease = leases.Lease{ID: leaseID}
+	}
+
+	digested := s.ingested.getDigested()
+	resource := leases.Resource{
+		Type: "content",
+	}
+	for _, dgst := range digested {
+		log.G(ctx).WithFields(logrus.Fields{
+			"digest": dgst,
+			"lease":  lease.ID,
+		}).Debug("Adding content digest to lease")
+
+		resource.ID = dgst.String()
+		if err := s.leases.AddResource(ctx, lease, resource); err != nil {
+			return errors.Wrapf(err, "error adding content digest to lease: %s", dgst)
+		}
+	}
+	return nil
+}
+
+// contentStoreForPull is used to wrap the configured content store to
+// add lease management for a single `pull`
+// It stores all committed digests so that `imageStoreForPull` can add
+// the digsted resources to the lease for an image.
+type contentStoreForPull struct {
+	distribution.ContentStore
+	leases leases.Manager
+
+	mu       sync.Mutex
+	digested []digest.Digest
+}
+
+func (c *contentStoreForPull) addDigested(dgst digest.Digest) {
+	c.mu.Lock()
+	c.digested = append(c.digested, dgst)
+	c.mu.Unlock()
+}
+
+func (c *contentStoreForPull) getDigested() []digest.Digest {
+	c.mu.Lock()
+	digested := make([]digest.Digest, len(c.digested))
+	copy(digested, c.digested)
+	c.mu.Unlock()
+	return digested
+}
+
+func (c *contentStoreForPull) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
+	w, err := c.ContentStore.Writer(ctx, opts...)
+	if err != nil {
+		if c8derrdefs.IsAlreadyExists(err) {
+			var cfg content.WriterOpts
+			for _, o := range opts {
+				if err := o(&cfg); err != nil {
+					return nil, err
+				}
+
+			}
+			c.addDigested(cfg.Desc.Digest)
+		}
+		return nil, err
+	}
+	return &contentWriter{
+		cs:     c,
+		Writer: w,
+	}, nil
+}
+
+type contentWriter struct {
+	cs *contentStoreForPull
+	content.Writer
+}
+
+func (w *contentWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
+	err := w.Writer.Commit(ctx, size, expected, opts...)
+	if err == nil || c8derrdefs.IsAlreadyExists(err) {
+		w.cs.addDigested(expected)
+	}
+	return err
+}
diff --git a/daemon/images/store_test.go b/daemon/images/store_test.go
new file mode 100644
index 0000000..212e362
--- /dev/null
+++ b/daemon/images/store_test.go
@@ -0,0 +1,124 @@
+package images
+
+import (
+	"context"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"testing"
+
+	"github.com/containerd/containerd/content"
+	"github.com/containerd/containerd/content/local"
+	c8derrdefs "github.com/containerd/containerd/errdefs"
+	"github.com/containerd/containerd/leases"
+	"github.com/containerd/containerd/metadata"
+	"github.com/containerd/containerd/namespaces"
+	"github.com/docker/docker/image"
+	digest "github.com/opencontainers/go-digest"
+	v1 "github.com/opencontainers/image-spec/specs-go/v1"
+	"go.etcd.io/bbolt"
+	"gotest.tools/v3/assert"
+	"gotest.tools/v3/assert/cmp"
+)
+
+func setupTestStores(t *testing.T) (context.Context, content.Store, *imageStoreWithLease, func(t *testing.T)) {
+	dir, err := ioutil.TempDir("", t.Name())
+	assert.NilError(t, err)
+
+	backend, err := image.NewFSStoreBackend(filepath.Join(dir, "images"))
+	assert.NilError(t, err)
+	is, err := image.NewImageStore(backend, nil)
+	assert.NilError(t, err)
+
+	db, err := bbolt.Open(filepath.Join(dir, "metadata.db"), 0600, nil)
+	assert.NilError(t, err)
+
+	cs, err := local.NewStore(filepath.Join(dir, "content"))
+	assert.NilError(t, err)
+	mdb := metadata.NewDB(db, cs, nil)
+
+	cleanup := func(t *testing.T) {
+		assert.Check(t, db.Close())
+		assert.Check(t, os.RemoveAll(dir))
+	}
+	ctx := namespaces.WithNamespace(context.Background(), t.Name())
+	images := &imageStoreWithLease{Store: is, ns: t.Name(), leases: metadata.NewLeaseManager(mdb)}
+
+	return ctx, cs, images, cleanup
+}
+
+func TestImageDelete(t *testing.T) {
+	ctx, _, images, cleanup := setupTestStores(t)
+	defer cleanup(t)
+
+	t.Run("no lease", func(t *testing.T) {
+		id, err := images.Create([]byte(`{"rootFS": {}}`))
+		assert.NilError(t, err)
+		defer images.Delete(id)
+
+		ls, err := images.leases.List(ctx)
+		assert.NilError(t, err)
+		assert.Equal(t, len(ls), 0, ls)
+
+		_, err = images.Delete(id)
+		assert.NilError(t, err, "should not error when there is no lease")
+	})
+
+	t.Run("lease exists", func(t *testing.T) {
+		id, err := images.Create([]byte(`{"rootFS": {}}`))
+		assert.NilError(t, err)
+		defer images.Delete(id)
+
+		leaseID := imageKey(digest.Digest(id))
+		_, err = images.leases.Create(ctx, leases.WithID(leaseID))
+		assert.NilError(t, err)
+		defer images.leases.Delete(ctx, leases.Lease{ID: leaseID})
+
+		ls, err := images.leases.List(ctx)
+		assert.NilError(t, err)
+		assert.Check(t, cmp.Equal(len(ls), 1), ls)
+
+		_, err = images.Delete(id)
+		assert.NilError(t, err)
+
+		ls, err = images.leases.List(ctx)
+		assert.NilError(t, err)
+		assert.Check(t, cmp.Equal(len(ls), 0), ls)
+	})
+}
+
+func TestContentStoreForPull(t *testing.T) {
+	ctx, cs, is, cleanup := setupTestStores(t)
+	defer cleanup(t)
+
+	csP := &contentStoreForPull{
+		ContentStore: cs,
+		leases:       is.leases,
+	}
+
+	data := []byte(`{}`)
+	desc := v1.Descriptor{
+		Digest: digest.Canonical.FromBytes(data),
+		Size:   int64(len(data)),
+	}
+
+	w, err := csP.Writer(ctx, content.WithRef(t.Name()), content.WithDescriptor(desc))
+	assert.NilError(t, err)
+
+	_, err = w.Write(data)
+	assert.NilError(t, err)
+	defer w.Close()
+
+	err = w.Commit(ctx, desc.Size, desc.Digest)
+	assert.NilError(t, err)
+
+	assert.Equal(t, len(csP.digested), 1)
+	assert.Check(t, cmp.Equal(csP.digested[0], desc.Digest))
+
+	// Test already exists
+	csP.digested = nil
+	_, err = csP.Writer(ctx, content.WithRef(t.Name()), content.WithDescriptor(desc))
+	assert.Check(t, c8derrdefs.IsAlreadyExists(err))
+	assert.Equal(t, len(csP.digested), 1)
+	assert.Check(t, cmp.Equal(csP.digested[0], desc.Digest))
+}
diff --git a/distribution/config.go b/distribution/config.go
index f96df7b..2e3dd7b 100644
--- a/distribution/config.go
+++ b/distribution/config.go
@@ -84,8 +84,8 @@
 // by digest. Allows getting an image configurations rootfs from the
 // configuration.
 type ImageConfigStore interface {
-	Put([]byte) (digest.Digest, error)
-	Get(digest.Digest) ([]byte, error)
+	Put(context.Context, []byte) (digest.Digest, error)
+	Get(context.Context, digest.Digest) ([]byte, error)
 	RootFSFromConfig([]byte) (*image.RootFS, error)
 	PlatformFromConfig([]byte) (*specs.Platform, error)
 }
@@ -128,12 +128,12 @@
 	}
 }
 
-func (s *imageConfigStore) Put(c []byte) (digest.Digest, error) {
+func (s *imageConfigStore) Put(_ context.Context, c []byte) (digest.Digest, error) {
 	id, err := s.Store.Create(c)
 	return digest.Digest(id), err
 }
 
-func (s *imageConfigStore) Get(d digest.Digest) ([]byte, error) {
+func (s *imageConfigStore) Get(_ context.Context, d digest.Digest) ([]byte, error) {
 	img, err := s.Store.Get(image.IDFromDigest(d))
 	if err != nil {
 		return nil, err
diff --git a/distribution/manifest.go b/distribution/manifest.go
new file mode 100644
index 0000000..a97373b
--- /dev/null
+++ b/distribution/manifest.go
@@ -0,0 +1,195 @@
+package distribution
+
+import (
+	"context"
+	"encoding/json"
+	"io"
+	"io/ioutil"
+
+	"github.com/containerd/containerd/content"
+	"github.com/containerd/containerd/errdefs"
+	"github.com/containerd/containerd/log"
+	"github.com/containerd/containerd/remotes"
+	"github.com/docker/distribution"
+	"github.com/docker/distribution/manifest/schema1"
+	digest "github.com/opencontainers/go-digest"
+	specs "github.com/opencontainers/image-spec/specs-go/v1"
+	"github.com/pkg/errors"
+)
+
+// This is used by manifestStore to pare down the requirements to implement a
+// full distribution.ManifestService, since `Get` is all we use here.
+type manifestGetter interface {
+	Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error)
+}
+
+type manifestStore struct {
+	local  ContentStore
+	remote manifestGetter
+}
+
+// ContentStore is the interface used to persist registry blobs
+//
+// Currently this is only used to persist manifests and manifest lists.
+// It is exported because `distribution.Pull` takes one as an argument.
+type ContentStore interface {
+	content.Ingester
+	content.Provider
+	Info(ctx context.Context, dgst digest.Digest) (content.Info, error)
+	Abort(ctx context.Context, ref string) error
+}
+
+func (m *manifestStore) getLocal(ctx context.Context, desc specs.Descriptor) (distribution.Manifest, error) {
+	ra, err := m.local.ReaderAt(ctx, desc)
+	if err != nil {
+		return nil, errors.Wrap(err, "error getting content store reader")
+	}
+	defer ra.Close()
+
+	r := io.NewSectionReader(ra, 0, ra.Size())
+	data, err := ioutil.ReadAll(r)
+	if err != nil {
+		return nil, errors.Wrap(err, "error reading manifest from content store")
+	}
+
+	manifest, _, err := distribution.UnmarshalManifest(desc.MediaType, data)
+	if err != nil {
+		return nil, errors.Wrap(err, "error unmarshaling manifest from content store")
+	}
+	return manifest, nil
+}
+
+func (m *manifestStore) getMediaType(ctx context.Context, desc specs.Descriptor) (string, error) {
+	ra, err := m.local.ReaderAt(ctx, desc)
+	if err != nil {
+		return "", errors.Wrap(err, "error getting reader to detect media type")
+	}
+	defer ra.Close()
+
+	mt, err := detectManifestMediaType(ra)
+	if err != nil {
+		return "", errors.Wrap(err, "error detecting media type")
+	}
+	return mt, nil
+}
+
+func (m *manifestStore) Get(ctx context.Context, desc specs.Descriptor) (distribution.Manifest, error) {
+	l := log.G(ctx)
+
+	if desc.MediaType == "" {
+		// When pulling by digest we will not have the media type on the
+		// descriptor since we have not made a request to the registry yet
+		//
+		// We already have the digest, so we only lookup locally... by digest.
+		//
+		// Let's try to detect the media type so we can have a good ref key
+		// here. We may not even have the content locally, and this is fine, but
+		// if we do we should determine that.
+		mt, err := m.getMediaType(ctx, desc)
+		if err != nil && !errdefs.IsNotFound(err) {
+			l.WithError(err).Warn("Error looking up media type of content")
+		}
+		desc.MediaType = mt
+	}
+
+	key := remotes.MakeRefKey(ctx, desc)
+
+	// Here we open a writer to the requested content. This both gives us a
+	// reference to write to if indeed we need to persist it and increments the
+	// ref count on the content.
+	w, err := m.local.Writer(ctx, content.WithDescriptor(desc), content.WithRef(key))
+	if err != nil {
+		if errdefs.IsAlreadyExists(err) {
+			var manifest distribution.Manifest
+			if manifest, err = m.getLocal(ctx, desc); err == nil {
+				return manifest, nil
+			}
+		}
+		// always fallback to the remote if there is an error with the local store
+	}
+	if w != nil {
+		defer w.Close()
+	}
+
+	l.WithError(err).Debug("Fetching manifest from remote")
+
+	manifest, err := m.remote.Get(ctx, desc.Digest)
+	if err != nil {
+		if err := m.local.Abort(ctx, key); err != nil {
+			l.WithError(err).Warn("Error while attempting to abort content ingest")
+		}
+		return nil, err
+	}
+
+	if w != nil {
+		// if `w` is nil here, something happened with the content store, so don't bother trying to persist.
+		if err := m.Put(ctx, manifest, desc, w); err != nil {
+			if err := m.local.Abort(ctx, key); err != nil {
+				l.WithError(err).Warn("error aborting content ingest")
+			}
+			l.WithError(err).Warn("Error persisting manifest")
+		}
+	}
+	return manifest, nil
+}
+
+func (m *manifestStore) Put(ctx context.Context, manifest distribution.Manifest, desc specs.Descriptor, w content.Writer) error {
+	mt, payload, err := manifest.Payload()
+	if err != nil {
+		return err
+	}
+	desc.Size = int64(len(payload))
+	desc.MediaType = mt
+
+	if _, err = w.Write(payload); err != nil {
+		return errors.Wrap(err, "error writing manifest to content store")
+	}
+
+	if err := w.Commit(ctx, desc.Size, desc.Digest); err != nil {
+		return errors.Wrap(err, "error committing manifest to content store")
+	}
+	return nil
+}
+
+func detectManifestMediaType(ra content.ReaderAt) (string, error) {
+	dt := make([]byte, ra.Size())
+	if _, err := ra.ReadAt(dt, 0); err != nil {
+		return "", err
+	}
+
+	return detectManifestBlobMediaType(dt)
+}
+
+// This is used when the manifest store does not know the media type of a sha it
+// was told to get. This would currently only happen when pulling by digest.
+// The media type is needed so the blob can be unmarshalled properly.
+func detectManifestBlobMediaType(dt []byte) (string, error) {
+	var mfst struct {
+		MediaType string          `json:"mediaType"`
+		Config    json.RawMessage `json:"config"`   // schema2 Manifest
+		FSLayers  json.RawMessage `json:"fsLayers"` // schema1 Manifest
+	}
+
+	if err := json.Unmarshal(dt, &mfst); err != nil {
+		return "", err
+	}
+
+	// We may have a media type specified in the json, in which case that should be used.
+	// Docker types should generally have a media type set.
+	// OCI (golang) types do not have a `mediaType` defined, and it is optional in the spec.
+	//
+	// `distrubtion.UnmarshalManifest`, which is used to unmarshal this for real, checks these media type values.
+	// If the specified media type does not match it will error, and in some cases (docker media types) it is required.
+	// So pretty much if we don't have a media type we can fall back to OCI.
+	// This does have a special fallback for schema1 manifests just because it is easy to detect.
+	switch {
+	case mfst.MediaType != "":
+		return mfst.MediaType, nil
+	case mfst.FSLayers != nil:
+		return schema1.MediaTypeManifest, nil
+	case mfst.Config != nil:
+		return specs.MediaTypeImageManifest, nil
+	default:
+		return specs.MediaTypeImageIndex, nil
+	}
+}
diff --git a/distribution/manifest_test.go b/distribution/manifest_test.go
new file mode 100644
index 0000000..0976a71
--- /dev/null
+++ b/distribution/manifest_test.go
@@ -0,0 +1,351 @@
+package distribution
+
+import (
+	"context"
+	"encoding/json"
+	"io/ioutil"
+	"os"
+	"strings"
+	"sync"
+	"testing"
+
+	"github.com/containerd/containerd/content"
+	"github.com/containerd/containerd/content/local"
+	"github.com/containerd/containerd/errdefs"
+	"github.com/containerd/containerd/remotes"
+	"github.com/docker/distribution"
+	"github.com/docker/distribution/manifest/ocischema"
+	"github.com/docker/distribution/manifest/schema1"
+	"github.com/google/go-cmp/cmp/cmpopts"
+	digest "github.com/opencontainers/go-digest"
+	specs "github.com/opencontainers/image-spec/specs-go/v1"
+	"github.com/pkg/errors"
+	"gotest.tools/v3/assert"
+	"gotest.tools/v3/assert/cmp"
+)
+
+type mockManifestGetter struct {
+	manifests map[digest.Digest]distribution.Manifest
+	gets      int
+}
+
+func (m *mockManifestGetter) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
+	m.gets++
+	manifest, ok := m.manifests[dgst]
+	if !ok {
+		return nil, distribution.ErrManifestUnknown{Tag: dgst.String()}
+	}
+	return manifest, nil
+}
+
+type memoryLabelStore struct {
+	mu     sync.Mutex
+	labels map[digest.Digest]map[string]string
+}
+
+// Get returns all the labels for the given digest
+func (s *memoryLabelStore) Get(dgst digest.Digest) (map[string]string, error) {
+	s.mu.Lock()
+	labels := s.labels[dgst]
+	s.mu.Unlock()
+	return labels, nil
+}
+
+// Set sets all the labels for a given digest
+func (s *memoryLabelStore) Set(dgst digest.Digest, labels map[string]string) error {
+	s.mu.Lock()
+	if s.labels == nil {
+		s.labels = make(map[digest.Digest]map[string]string)
+	}
+	s.labels[dgst] = labels
+	s.mu.Unlock()
+	return nil
+}
+
+// Update replaces the given labels for a digest,
+// a key with an empty value removes a label.
+func (s *memoryLabelStore) Update(dgst digest.Digest, update map[string]string) (map[string]string, error) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	labels, ok := s.labels[dgst]
+	if !ok {
+		labels = map[string]string{}
+	}
+	for k, v := range update {
+		labels[k] = v
+	}
+
+	s.labels[dgst] = labels
+
+	return labels, nil
+}
+
+type testingContentStoreWrapper struct {
+	ContentStore
+	errorOnWriter error
+	errorOnCommit error
+}
+
+func (s *testingContentStoreWrapper) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
+	if s.errorOnWriter != nil {
+		return nil, s.errorOnWriter
+	}
+
+	w, err := s.ContentStore.Writer(ctx, opts...)
+	if err != nil {
+		return nil, err
+	}
+
+	if s.errorOnCommit != nil {
+		w = &testingContentWriterWrapper{w, s.errorOnCommit}
+	}
+	return w, nil
+}
+
+type testingContentWriterWrapper struct {
+	content.Writer
+	err error
+}
+
+func (w *testingContentWriterWrapper) Commit(ctx context.Context, size int64, dgst digest.Digest, opts ...content.Opt) error {
+	if w.err != nil {
+		// The contract for `Commit` is to always close.
+		// Since this is returning early before hitting the real `Commit`, we should close it here.
+		w.Close()
+		return w.err
+	}
+	return w.Writer.Commit(ctx, size, dgst, opts...)
+}
+
+func TestManifestStore(t *testing.T) {
+	ociManifest := &specs.Manifest{}
+	serialized, err := json.Marshal(ociManifest)
+	assert.NilError(t, err)
+	dgst := digest.Canonical.FromBytes(serialized)
+
+	setupTest := func(t *testing.T) (specs.Descriptor, *mockManifestGetter, *manifestStore, content.Store, func(*testing.T)) {
+		root, err := ioutil.TempDir("", strings.Replace(t.Name(), "/", "_", -1))
+		assert.NilError(t, err)
+		defer func() {
+			if t.Failed() {
+				os.RemoveAll(root)
+			}
+		}()
+
+		cs, err := local.NewLabeledStore(root, &memoryLabelStore{})
+		assert.NilError(t, err)
+
+		mg := &mockManifestGetter{manifests: make(map[digest.Digest]distribution.Manifest)}
+		store := &manifestStore{local: cs, remote: mg}
+		desc := specs.Descriptor{Digest: dgst, MediaType: specs.MediaTypeImageManifest, Size: int64(len(serialized))}
+
+		return desc, mg, store, cs, func(t *testing.T) {
+			assert.Check(t, os.RemoveAll(root))
+		}
+	}
+
+	ctx := context.Background()
+
+	m, _, err := distribution.UnmarshalManifest(specs.MediaTypeImageManifest, serialized)
+	assert.NilError(t, err)
+
+	writeManifest := func(t *testing.T, cs ContentStore, desc specs.Descriptor, opts ...content.Opt) {
+		ingestKey := remotes.MakeRefKey(ctx, desc)
+		w, err := cs.Writer(ctx, content.WithDescriptor(desc), content.WithRef(ingestKey))
+		assert.NilError(t, err)
+		defer func() {
+			if err := w.Close(); err != nil {
+				t.Log(err)
+			}
+			if t.Failed() {
+				if err := cs.Abort(ctx, ingestKey); err != nil {
+					t.Log(err)
+				}
+			}
+		}()
+
+		_, err = w.Write(serialized)
+		assert.NilError(t, err)
+
+		err = w.Commit(ctx, desc.Size, desc.Digest, opts...)
+		assert.NilError(t, err)
+
+	}
+
+	// All tests should end up with no active ingest
+	checkIngest := func(t *testing.T, cs content.Store, desc specs.Descriptor) {
+		ingestKey := remotes.MakeRefKey(ctx, desc)
+		_, err := cs.Status(ctx, ingestKey)
+		assert.Check(t, errdefs.IsNotFound(err), err)
+	}
+
+	t.Run("no remote or local", func(t *testing.T) {
+		desc, _, store, cs, teardown := setupTest(t)
+		defer teardown(t)
+
+		_, err = store.Get(ctx, desc)
+		checkIngest(t, cs, desc)
+		// This error is what our digest getter returns when it doesn't know about the manifest
+		assert.Error(t, err, distribution.ErrManifestUnknown{Tag: dgst.String()}.Error())
+	})
+
+	t.Run("no local cache", func(t *testing.T) {
+		desc, mg, store, cs, teardown := setupTest(t)
+		defer teardown(t)
+
+		mg.manifests[desc.Digest] = m
+
+		m2, err := store.Get(ctx, desc)
+		checkIngest(t, cs, desc)
+		assert.NilError(t, err)
+		assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
+		assert.Check(t, cmp.Equal(mg.gets, 1))
+
+		i, err := cs.Info(ctx, desc.Digest)
+		assert.NilError(t, err)
+		assert.Check(t, cmp.Equal(i.Digest, desc.Digest))
+
+		// Now check again, this should not hit the remote
+		m2, err = store.Get(ctx, desc)
+		checkIngest(t, cs, desc)
+		assert.NilError(t, err)
+		assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
+		assert.Check(t, cmp.Equal(mg.gets, 1))
+	})
+
+	t.Run("with local cache", func(t *testing.T) {
+		desc, mg, store, cs, teardown := setupTest(t)
+		defer teardown(t)
+
+		// first add the manifest to the coontent store
+		writeManifest(t, cs, desc)
+
+		// now do the get
+		m2, err := store.Get(ctx, desc)
+		checkIngest(t, cs, desc)
+		assert.NilError(t, err)
+		assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
+		assert.Check(t, cmp.Equal(mg.gets, 0))
+
+		i, err := cs.Info(ctx, desc.Digest)
+		assert.NilError(t, err)
+		assert.Check(t, cmp.Equal(i.Digest, desc.Digest))
+	})
+
+	// This is for the case of pull by digest where we don't know the media type of the manifest until it's actually pulled.
+	t.Run("unknown media type", func(t *testing.T) {
+		t.Run("no cache", func(t *testing.T) {
+			desc, mg, store, cs, teardown := setupTest(t)
+			defer teardown(t)
+
+			mg.manifests[desc.Digest] = m
+			desc.MediaType = ""
+
+			m2, err := store.Get(ctx, desc)
+			checkIngest(t, cs, desc)
+			assert.NilError(t, err)
+			assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
+			assert.Check(t, cmp.Equal(mg.gets, 1))
+		})
+
+		t.Run("with cache", func(t *testing.T) {
+			t.Run("cached manifest has media type", func(t *testing.T) {
+				desc, mg, store, cs, teardown := setupTest(t)
+				defer teardown(t)
+
+				writeManifest(t, cs, desc)
+				desc.MediaType = ""
+
+				m2, err := store.Get(ctx, desc)
+				checkIngest(t, cs, desc)
+				assert.NilError(t, err)
+				assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
+				assert.Check(t, cmp.Equal(mg.gets, 0))
+			})
+
+			t.Run("cached manifest has no media type", func(t *testing.T) {
+				desc, mg, store, cs, teardown := setupTest(t)
+				defer teardown(t)
+
+				desc.MediaType = ""
+				writeManifest(t, cs, desc)
+
+				m2, err := store.Get(ctx, desc)
+				checkIngest(t, cs, desc)
+				assert.NilError(t, err)
+				assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
+				assert.Check(t, cmp.Equal(mg.gets, 0))
+			})
+		})
+	})
+
+	// Test that if there is an error with the content store, for whatever
+	// reason, that doesn't stop us from getting the manifest.
+	//
+	// Also makes sure the ingests are aborted.
+	t.Run("error persisting manifest", func(t *testing.T) {
+		t.Run("error on writer", func(t *testing.T) {
+			desc, mg, store, cs, teardown := setupTest(t)
+			defer teardown(t)
+			mg.manifests[desc.Digest] = m
+
+			csW := &testingContentStoreWrapper{ContentStore: store.local, errorOnWriter: errors.New("random error")}
+			store.local = csW
+
+			m2, err := store.Get(ctx, desc)
+			checkIngest(t, cs, desc)
+			assert.NilError(t, err)
+			assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
+			assert.Check(t, cmp.Equal(mg.gets, 1))
+
+			_, err = cs.Info(ctx, desc.Digest)
+			// Nothing here since we couldn't persist
+			assert.Check(t, errdefs.IsNotFound(err), err)
+		})
+
+		t.Run("error on commit", func(t *testing.T) {
+			desc, mg, store, cs, teardown := setupTest(t)
+			defer teardown(t)
+			mg.manifests[desc.Digest] = m
+
+			csW := &testingContentStoreWrapper{ContentStore: store.local, errorOnCommit: errors.New("random error")}
+			store.local = csW
+
+			m2, err := store.Get(ctx, desc)
+			checkIngest(t, cs, desc)
+			assert.NilError(t, err)
+			assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
+			assert.Check(t, cmp.Equal(mg.gets, 1))
+
+			_, err = cs.Info(ctx, desc.Digest)
+			// Nothing here since we couldn't persist
+			assert.Check(t, errdefs.IsNotFound(err), err)
+		})
+	})
+}
+
+func TestDetectManifestBlobMediaType(t *testing.T) {
+	type testCase struct {
+		json     []byte
+		expected string
+	}
+	cases := map[string]testCase{
+		"mediaType is set":   {[]byte(`{"mediaType": "bananas"}`), "bananas"},
+		"oci manifest":       {[]byte(`{"config": {}}`), specs.MediaTypeImageManifest},
+		"schema1":            {[]byte(`{"fsLayers": []}`), schema1.MediaTypeManifest},
+		"oci index fallback": {[]byte(`{}`), specs.MediaTypeImageIndex},
+		// Make sure we prefer mediaType
+		"mediaType and config set":   {[]byte(`{"mediaType": "bananas", "config": {}}`), "bananas"},
+		"mediaType and fsLayers set": {[]byte(`{"mediaType": "bananas", "fsLayers": []}`), "bananas"},
+	}
+
+	for name, tc := range cases {
+		t.Run(name, func(t *testing.T) {
+			mt, err := detectManifestBlobMediaType(tc.json)
+			assert.NilError(t, err)
+			assert.Equal(t, mt, tc.expected)
+		})
+	}
+
+}
diff --git a/distribution/pull.go b/distribution/pull.go
index ecf2f98..c8ddd4c 100644
--- a/distribution/pull.go
+++ b/distribution/pull.go
@@ -29,7 +29,7 @@
 // whether a v1 or v2 puller will be created. The other parameters are passed
 // through to the underlying puller implementation for use during the actual
 // pull operation.
-func newPuller(endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePullConfig *ImagePullConfig) (Puller, error) {
+func newPuller(endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePullConfig *ImagePullConfig, local ContentStore) (Puller, error) {
 	switch endpoint.Version {
 	case registry.APIVersion2:
 		return &v2Puller{
@@ -37,6 +37,9 @@
 			endpoint:          endpoint,
 			config:            imagePullConfig,
 			repoInfo:          repoInfo,
+			manifestStore: &manifestStore{
+				local: local,
+			},
 		}, nil
 	case registry.APIVersion1:
 		return nil, fmt.Errorf("protocol version %d no longer supported. Please contact admins of registry %s", endpoint.Version, endpoint.URL)
@@ -46,7 +49,7 @@
 
 // Pull initiates a pull operation. image is the repository name to pull, and
 // tag may be either empty, or indicate a specific tag to pull.
-func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullConfig) error {
+func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullConfig, local ContentStore) error {
 	// Resolve the Repository name from fqn to RepositoryInfo
 	repoInfo, err := imagePullConfig.RegistryService.ResolveRepository(ref)
 	if err != nil {
@@ -104,7 +107,7 @@
 
 		logrus.Debugf("Trying to pull %s from %s %s", reference.FamiliarName(repoInfo.Name), endpoint.URL, endpoint.Version)
 
-		puller, err := newPuller(endpoint, repoInfo, imagePullConfig)
+		puller, err := newPuller(endpoint, repoInfo, imagePullConfig, local)
 		if err != nil {
 			lastErr = err
 			continue
diff --git a/distribution/pull_v2.go b/distribution/pull_v2.go
index 54b66d0..b5db4ee 100644
--- a/distribution/pull_v2.go
+++ b/distribution/pull_v2.go
@@ -11,6 +11,7 @@
 	"runtime"
 	"strings"
 
+	"github.com/containerd/containerd/log"
 	"github.com/containerd/containerd/platforms"
 	"github.com/docker/distribution"
 	"github.com/docker/distribution/manifest/manifestlist"
@@ -62,7 +63,8 @@
 	repo              distribution.Repository
 	// confirmedV2 is set to true if we confirm we're talking to a v2
 	// registry. This is used to limit fallbacks to the v1 protocol.
-	confirmedV2 bool
+	confirmedV2   bool
+	manifestStore *manifestStore
 }
 
 func (p *v2Puller) Pull(ctx context.Context, ref reference.Named, platform *specs.Platform) (err error) {
@@ -73,6 +75,11 @@
 		return err
 	}
 
+	p.manifestStore.remote, err = p.repo.Manifests(ctx)
+	if err != nil {
+		return err
+	}
+
 	if err = p.pullV2Repository(ctx, ref, platform); err != nil {
 		if _, ok := err.(fallbackError); ok {
 			return err
@@ -330,31 +337,45 @@
 }
 
 func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named, platform *specs.Platform) (tagUpdated bool, err error) {
-	manSvc, err := p.repo.Manifests(ctx)
-	if err != nil {
-		return false, err
-	}
 
 	var (
-		manifest    distribution.Manifest
 		tagOrDigest string // Used for logging/progress only
+		dgst        digest.Digest
+		mt          string
+		size        int64
 	)
 	if digested, isDigested := ref.(reference.Canonical); isDigested {
-		manifest, err = manSvc.Get(ctx, digested.Digest())
-		if err != nil {
-			return false, err
-		}
-		tagOrDigest = digested.Digest().String()
+		dgst = digested.Digest()
+		tagOrDigest = digested.String()
 	} else if tagged, isTagged := ref.(reference.NamedTagged); isTagged {
-		manifest, err = manSvc.Get(ctx, "", distribution.WithTag(tagged.Tag()))
+		tagService := p.repo.Tags(ctx)
+		desc, err := tagService.Get(ctx, tagged.Tag())
 		if err != nil {
 			return false, allowV1Fallback(err)
 		}
+		dgst = desc.Digest
 		tagOrDigest = tagged.Tag()
+		mt = desc.MediaType
+		size = desc.Size
 	} else {
 		return false, fmt.Errorf("internal error: reference has neither a tag nor a digest: %s", reference.FamiliarString(ref))
 	}
 
+	ctx = log.WithLogger(ctx, logrus.WithFields(
+		logrus.Fields{
+			"digest": dgst,
+			"remote": ref,
+		}))
+
+	manifest, err := p.manifestStore.Get(ctx, specs.Descriptor{
+		MediaType: mt,
+		Digest:    dgst,
+		Size:      size,
+	})
+	if err != nil {
+		return false, err
+	}
+
 	if manifest == nil {
 		return false, fmt.Errorf("image manifest does not exist for tag or digest %q", tagOrDigest)
 	}
@@ -553,7 +574,7 @@
 		return "", "", err
 	}
 
-	imageID, err := p.config.ImageStore.Put(config)
+	imageID, err := p.config.ImageStore.Put(ctx, config)
 	if err != nil {
 		return "", "", err
 	}
@@ -564,7 +585,7 @@
 }
 
 func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.Descriptor, layers []distribution.Descriptor, platform *specs.Platform) (id digest.Digest, err error) {
-	if _, err := p.config.ImageStore.Get(target.Digest); err == nil {
+	if _, err := p.config.ImageStore.Get(ctx, target.Digest); err == nil {
 		// If the image already exists locally, no need to pull
 		// anything.
 		return target.Digest, nil
@@ -721,7 +742,7 @@
 		}
 	}
 
-	imageID, err := p.config.ImageStore.Put(configJSON)
+	imageID, err := p.config.ImageStore.Put(ctx, configJSON)
 	if err != nil {
 		return "", err
 	}
@@ -791,23 +812,22 @@
 	if len(manifestMatches) > 1 {
 		logrus.Debugf("found multiple matches in manifest list, choosing best match %s", manifestMatches[0].Digest.String())
 	}
-	manifestDigest := manifestMatches[0].Digest
+	match := manifestMatches[0]
 
-	if err := checkImageCompatibility(manifestMatches[0].Platform.OS, manifestMatches[0].Platform.OSVersion); err != nil {
+	if err := checkImageCompatibility(match.Platform.OS, match.Platform.OSVersion); err != nil {
 		return "", "", err
 	}
 
-	manSvc, err := p.repo.Manifests(ctx)
+	manifest, err := p.manifestStore.Get(ctx, specs.Descriptor{
+		Digest:    match.Digest,
+		Size:      match.Size,
+		MediaType: match.MediaType,
+	})
 	if err != nil {
 		return "", "", err
 	}
 
-	manifest, err := manSvc.Get(ctx, manifestDigest)
-	if err != nil {
-		return "", "", err
-	}
-
-	manifestRef, err := reference.WithDigest(reference.TrimNamed(ref), manifestDigest)
+	manifestRef, err := reference.WithDigest(reference.TrimNamed(ref), match.Digest)
 	if err != nil {
 		return "", "", err
 	}
diff --git a/distribution/push_v2.go b/distribution/push_v2.go
index b64230f..764149c 100644
--- a/distribution/push_v2.go
+++ b/distribution/push_v2.go
@@ -116,7 +116,7 @@
 func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error {
 	logrus.Debugf("Pushing repository: %s", reference.FamiliarString(ref))
 
-	imgConfig, err := p.config.ImageStore.Get(id)
+	imgConfig, err := p.config.ImageStore.Get(ctx, id)
 	if err != nil {
 		return fmt.Errorf("could not find image from tag %s: %v", reference.FamiliarString(ref), err)
 	}
diff --git a/distribution/registry_unit_test.go b/distribution/registry_unit_test.go
index 3651c46..60fbeb9 100644
--- a/distribution/registry_unit_test.go
+++ b/distribution/registry_unit_test.go
@@ -69,7 +69,7 @@
 		},
 		Schema2Types: ImageTypes,
 	}
-	puller, err := newPuller(endpoint, repoInfo, imagePullConfig)
+	puller, err := newPuller(endpoint, repoInfo, imagePullConfig, nil)
 	if err != nil {
 		t.Fatal(err)
 	}