Merge pull request #41607 from cpuguy83/use_head_for_manifest_by_tag
cache manifests on pull
diff --git a/daemon/content.go b/daemon/content.go
new file mode 100644
index 0000000..01d1e23
--- /dev/null
+++ b/daemon/content.go
@@ -0,0 +1,30 @@
+package daemon
+
+import (
+ "os"
+ "path/filepath"
+
+ "github.com/containerd/containerd/content"
+ "github.com/containerd/containerd/content/local"
+ "github.com/containerd/containerd/leases"
+ "github.com/containerd/containerd/metadata"
+ "github.com/pkg/errors"
+ "go.etcd.io/bbolt"
+)
+
+func (d *Daemon) configureLocalContentStore() (content.Store, leases.Manager, error) {
+ if err := os.MkdirAll(filepath.Join(d.root, "content"), 0700); err != nil {
+ return nil, nil, errors.Wrap(err, "error creating dir for content store")
+ }
+ db, err := bbolt.Open(filepath.Join(d.root, "content", "metadata.db"), 0600, nil)
+ if err != nil {
+ return nil, nil, errors.Wrap(err, "error opening bolt db for content metadata store")
+ }
+ cs, err := local.NewStore(filepath.Join(d.root, "content", "data"))
+ if err != nil {
+ return nil, nil, errors.Wrap(err, "error setting up content store")
+ }
+ md := metadata.NewDB(db, cs, nil)
+ d.mdDB = db
+ return md.ContentStore(), metadata.NewLeaseManager(md), nil
+}
diff --git a/daemon/daemon.go b/daemon/daemon.go
index f81f45a..9e3a0fa 100644
--- a/daemon/daemon.go
+++ b/daemon/daemon.go
@@ -20,6 +20,7 @@
"time"
"github.com/docker/docker/pkg/fileutils"
+ "go.etcd.io/bbolt"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
@@ -129,6 +130,11 @@
attachmentStore network.AttachmentStore
attachableNetworkLock *locker.Locker
+
+ // This is used for Windows which doesn't currently support running on containerd
+ // It stores metadata for the content store (used for manifest caching)
+ // This needs to be closed on daemon exit
+ mdDB *bbolt.DB
}
// StoreHosts stores the addresses the daemon is listening on
@@ -1066,10 +1072,7 @@
d.linkIndex = newLinkIndex()
- // TODO: imageStore, distributionMetadataStore, and ReferenceStore are only
- // used above to run migration. They could be initialized in ImageService
- // if migration is called from daemon/images. layerStore might move as well.
- d.imageService = images.NewImageService(images.ImageServiceConfig{
+ imgSvcConfig := images.ImageServiceConfig{
ContainerStore: d.containers,
DistributionMetadataStore: distributionMetadataStore,
EventsService: d.EventsService,
@@ -1081,7 +1084,28 @@
ReferenceStore: rs,
RegistryService: registryService,
TrustKey: trustKey,
- })
+ ContentNamespace: config.ContainerdNamespace,
+ }
+
+ // containerd is not currently supported with Windows.
+ // So sometimes d.containerdCli will be nil
+ // In that case we'll create a local content store... but otherwise we'll use containerd
+ if d.containerdCli != nil {
+ imgSvcConfig.Leases = d.containerdCli.LeasesService()
+ imgSvcConfig.ContentStore = d.containerdCli.ContentStore()
+ } else {
+ cs, lm, err := d.configureLocalContentStore()
+ if err != nil {
+ return nil, err
+ }
+ imgSvcConfig.ContentStore = cs
+ imgSvcConfig.Leases = lm
+ }
+
+ // TODO: imageStore, distributionMetadataStore, and ReferenceStore are only
+ // used above to run migration. They could be initialized in ImageService
+ // if migration is called from daemon/images. layerStore might move as well.
+ d.imageService = images.NewImageService(imgSvcConfig)
go d.execCommandGC()
@@ -1246,6 +1270,10 @@
daemon.containerdCli.Close()
}
+ if daemon.mdDB != nil {
+ daemon.mdDB.Close()
+ }
+
return daemon.cleanupMounts()
}
diff --git a/daemon/images/image_pull.go b/daemon/images/image_pull.go
index 4eedfdd..ed9099b 100644
--- a/daemon/images/image_pull.go
+++ b/daemon/images/image_pull.go
@@ -6,6 +6,8 @@
"strings"
"time"
+ "github.com/containerd/containerd/leases"
+ "github.com/containerd/containerd/namespaces"
dist "github.com/docker/distribution"
"github.com/docker/distribution/reference"
"github.com/docker/docker/api/types"
@@ -16,6 +18,7 @@
"github.com/docker/docker/registry"
digest "github.com/opencontainers/go-digest"
specs "github.com/opencontainers/image-spec/specs-go/v1"
+ "github.com/pkg/errors"
)
// PullImage initiates a pull operation. image is the repository name to pull, and
@@ -65,6 +68,25 @@
close(writesDone)
}()
+ ctx = namespaces.WithNamespace(ctx, i.contentNamespace)
+ // Take out a temporary lease for everything that gets persisted to the content store.
+ // Before the lease is cancelled, any content we want to keep should have it's own lease applied.
+ ctx, done, err := tempLease(ctx, i.leases)
+ if err != nil {
+ return err
+ }
+ defer done(ctx)
+
+ cs := &contentStoreForPull{
+ ContentStore: i.content,
+ leases: i.leases,
+ }
+ imageStore := &imageStoreForPull{
+ ImageConfigStore: distribution.NewImageConfigStoreFromStore(i.imageStore),
+ ingested: cs,
+ leases: i.leases,
+ }
+
imagePullConfig := &distribution.ImagePullConfig{
Config: distribution.Config{
MetaHeaders: metaHeaders,
@@ -73,7 +95,7 @@
RegistryService: i.registryService,
ImageEventLogger: i.LogImageEvent,
MetadataStore: i.distributionMetadataStore,
- ImageStore: distribution.NewImageConfigStoreFromStore(i.imageStore),
+ ImageStore: imageStore,
ReferenceStore: i.referenceStore,
},
DownloadManager: i.downloadManager,
@@ -81,7 +103,7 @@
Platform: platform,
}
- err := distribution.Pull(ctx, ref, imagePullConfig)
+ err = distribution.Pull(ctx, ref, imagePullConfig, cs)
close(progressChan)
<-writesDone
return err
@@ -124,3 +146,29 @@
}
return repository, confirmedV2, lastError
}
+
+func tempLease(ctx context.Context, mgr leases.Manager) (context.Context, func(context.Context) error, error) {
+ nop := func(context.Context) error { return nil }
+ _, ok := leases.FromContext(ctx)
+ if ok {
+ return ctx, nop, nil
+ }
+
+ // Use an expiration that ensures the lease is cleaned up at some point if there is a crash, SIGKILL, etc.
+ opts := []leases.Opt{
+ leases.WithRandomID(),
+ leases.WithExpiration(24 * time.Hour),
+ leases.WithLabels(map[string]string{
+ "moby.lease/temporary": time.Now().UTC().Format(time.RFC3339Nano),
+ }),
+ }
+ l, err := mgr.Create(ctx, opts...)
+ if err != nil {
+ return ctx, nop, errors.Wrap(err, "error creating temporary lease")
+ }
+
+ ctx = leases.WithLease(ctx, l.ID)
+ return ctx, func(ctx context.Context) error {
+ return mgr.Delete(ctx, l)
+ }, nil
+}
diff --git a/daemon/images/service.go b/daemon/images/service.go
index 388aa2d..e0297c3 100644
--- a/daemon/images/service.go
+++ b/daemon/images/service.go
@@ -5,6 +5,8 @@
"os"
"runtime"
+ "github.com/containerd/containerd/content"
+ "github.com/containerd/containerd/leases"
"github.com/docker/docker/container"
daemonevents "github.com/docker/docker/daemon/events"
"github.com/docker/docker/distribution"
@@ -42,6 +44,9 @@
ReferenceStore dockerreference.Store
RegistryService registry.Service
TrustKey libtrust.PrivateKey
+ ContentStore content.Store
+ Leases leases.Manager
+ ContentNamespace string
}
// NewImageService returns a new ImageService from a configuration
@@ -54,12 +59,15 @@
distributionMetadataStore: config.DistributionMetadataStore,
downloadManager: xfer.NewLayerDownloadManager(config.LayerStores, config.MaxConcurrentDownloads, xfer.WithMaxDownloadAttempts(config.MaxDownloadAttempts)),
eventsService: config.EventsService,
- imageStore: config.ImageStore,
+ imageStore: &imageStoreWithLease{Store: config.ImageStore, leases: config.Leases, ns: config.ContentNamespace},
layerStores: config.LayerStores,
referenceStore: config.ReferenceStore,
registryService: config.RegistryService,
trustKey: config.TrustKey,
uploadManager: xfer.NewLayerUploadManager(config.MaxConcurrentUploads),
+ leases: config.Leases,
+ content: config.ContentStore,
+ contentNamespace: config.ContentNamespace,
}
}
@@ -76,6 +84,9 @@
registryService registry.Service
trustKey libtrust.PrivateKey
uploadManager *xfer.LayerUploadManager
+ leases leases.Manager
+ content content.Store
+ contentNamespace string
}
// DistributionServices provides daemon image storage services
diff --git a/daemon/images/store.go b/daemon/images/store.go
new file mode 100644
index 0000000..2c8d481
--- /dev/null
+++ b/daemon/images/store.go
@@ -0,0 +1,155 @@
+package images
+
+import (
+ "context"
+ "sync"
+
+ "github.com/containerd/containerd/content"
+ c8derrdefs "github.com/containerd/containerd/errdefs"
+ "github.com/containerd/containerd/leases"
+ "github.com/containerd/containerd/log"
+ "github.com/containerd/containerd/namespaces"
+ "github.com/docker/docker/distribution"
+ "github.com/docker/docker/image"
+ "github.com/docker/docker/layer"
+ digest "github.com/opencontainers/go-digest"
+ "github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
+)
+
+func imageKey(dgst digest.Digest) string {
+ return "moby-image-" + dgst.String()
+}
+
+// imageStoreWithLease wraps the configured image store with one that deletes the lease
+// reigstered for a given image ID, if one exists
+//
+// This is used by the main image service to wrap delete calls to the real image store.
+type imageStoreWithLease struct {
+ image.Store
+ leases leases.Manager
+
+ // Normally we'd pass namespace down through a context.Context, however...
+ // The interface for image store doesn't allow this, so we store it here.
+ ns string
+}
+
+func (s *imageStoreWithLease) Delete(id image.ID) ([]layer.Metadata, error) {
+ ctx := namespaces.WithNamespace(context.TODO(), s.ns)
+ if err := s.leases.Delete(ctx, leases.Lease{ID: imageKey(digest.Digest(id))}); err != nil && !c8derrdefs.IsNotFound(err) {
+ return nil, errors.Wrap(err, "error deleting lease")
+ }
+ return s.Store.Delete(id)
+}
+
+// iamgeStoreForPull is created for each pull It wraps an underlying image store
+// to handle registering leases for content fetched in a single image pull.
+type imageStoreForPull struct {
+ distribution.ImageConfigStore
+ leases leases.Manager
+ ingested *contentStoreForPull
+}
+
+func (s *imageStoreForPull) Put(ctx context.Context, config []byte) (digest.Digest, error) {
+ id, err := s.ImageConfigStore.Put(ctx, config)
+ if err != nil {
+ return "", err
+ }
+ return id, s.updateLease(ctx, id)
+}
+
+func (s *imageStoreForPull) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
+ id, err := s.ImageConfigStore.Get(ctx, dgst)
+ if err != nil {
+ return nil, err
+ }
+ return id, s.updateLease(ctx, dgst)
+}
+
+func (s *imageStoreForPull) updateLease(ctx context.Context, dgst digest.Digest) error {
+ leaseID := imageKey(dgst)
+ lease, err := s.leases.Create(ctx, leases.WithID(leaseID))
+ if err != nil {
+ if !c8derrdefs.IsAlreadyExists(err) {
+ return errors.Wrap(err, "error creating lease")
+ }
+ lease = leases.Lease{ID: leaseID}
+ }
+
+ digested := s.ingested.getDigested()
+ resource := leases.Resource{
+ Type: "content",
+ }
+ for _, dgst := range digested {
+ log.G(ctx).WithFields(logrus.Fields{
+ "digest": dgst,
+ "lease": lease.ID,
+ }).Debug("Adding content digest to lease")
+
+ resource.ID = dgst.String()
+ if err := s.leases.AddResource(ctx, lease, resource); err != nil {
+ return errors.Wrapf(err, "error adding content digest to lease: %s", dgst)
+ }
+ }
+ return nil
+}
+
+// contentStoreForPull is used to wrap the configured content store to
+// add lease management for a single `pull`
+// It stores all committed digests so that `imageStoreForPull` can add
+// the digsted resources to the lease for an image.
+type contentStoreForPull struct {
+ distribution.ContentStore
+ leases leases.Manager
+
+ mu sync.Mutex
+ digested []digest.Digest
+}
+
+func (c *contentStoreForPull) addDigested(dgst digest.Digest) {
+ c.mu.Lock()
+ c.digested = append(c.digested, dgst)
+ c.mu.Unlock()
+}
+
+func (c *contentStoreForPull) getDigested() []digest.Digest {
+ c.mu.Lock()
+ digested := make([]digest.Digest, len(c.digested))
+ copy(digested, c.digested)
+ c.mu.Unlock()
+ return digested
+}
+
+func (c *contentStoreForPull) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
+ w, err := c.ContentStore.Writer(ctx, opts...)
+ if err != nil {
+ if c8derrdefs.IsAlreadyExists(err) {
+ var cfg content.WriterOpts
+ for _, o := range opts {
+ if err := o(&cfg); err != nil {
+ return nil, err
+ }
+
+ }
+ c.addDigested(cfg.Desc.Digest)
+ }
+ return nil, err
+ }
+ return &contentWriter{
+ cs: c,
+ Writer: w,
+ }, nil
+}
+
+type contentWriter struct {
+ cs *contentStoreForPull
+ content.Writer
+}
+
+func (w *contentWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
+ err := w.Writer.Commit(ctx, size, expected, opts...)
+ if err == nil || c8derrdefs.IsAlreadyExists(err) {
+ w.cs.addDigested(expected)
+ }
+ return err
+}
diff --git a/daemon/images/store_test.go b/daemon/images/store_test.go
new file mode 100644
index 0000000..212e362
--- /dev/null
+++ b/daemon/images/store_test.go
@@ -0,0 +1,124 @@
+package images
+
+import (
+ "context"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/containerd/containerd/content"
+ "github.com/containerd/containerd/content/local"
+ c8derrdefs "github.com/containerd/containerd/errdefs"
+ "github.com/containerd/containerd/leases"
+ "github.com/containerd/containerd/metadata"
+ "github.com/containerd/containerd/namespaces"
+ "github.com/docker/docker/image"
+ digest "github.com/opencontainers/go-digest"
+ v1 "github.com/opencontainers/image-spec/specs-go/v1"
+ "go.etcd.io/bbolt"
+ "gotest.tools/v3/assert"
+ "gotest.tools/v3/assert/cmp"
+)
+
+func setupTestStores(t *testing.T) (context.Context, content.Store, *imageStoreWithLease, func(t *testing.T)) {
+ dir, err := ioutil.TempDir("", t.Name())
+ assert.NilError(t, err)
+
+ backend, err := image.NewFSStoreBackend(filepath.Join(dir, "images"))
+ assert.NilError(t, err)
+ is, err := image.NewImageStore(backend, nil)
+ assert.NilError(t, err)
+
+ db, err := bbolt.Open(filepath.Join(dir, "metadata.db"), 0600, nil)
+ assert.NilError(t, err)
+
+ cs, err := local.NewStore(filepath.Join(dir, "content"))
+ assert.NilError(t, err)
+ mdb := metadata.NewDB(db, cs, nil)
+
+ cleanup := func(t *testing.T) {
+ assert.Check(t, db.Close())
+ assert.Check(t, os.RemoveAll(dir))
+ }
+ ctx := namespaces.WithNamespace(context.Background(), t.Name())
+ images := &imageStoreWithLease{Store: is, ns: t.Name(), leases: metadata.NewLeaseManager(mdb)}
+
+ return ctx, cs, images, cleanup
+}
+
+func TestImageDelete(t *testing.T) {
+ ctx, _, images, cleanup := setupTestStores(t)
+ defer cleanup(t)
+
+ t.Run("no lease", func(t *testing.T) {
+ id, err := images.Create([]byte(`{"rootFS": {}}`))
+ assert.NilError(t, err)
+ defer images.Delete(id)
+
+ ls, err := images.leases.List(ctx)
+ assert.NilError(t, err)
+ assert.Equal(t, len(ls), 0, ls)
+
+ _, err = images.Delete(id)
+ assert.NilError(t, err, "should not error when there is no lease")
+ })
+
+ t.Run("lease exists", func(t *testing.T) {
+ id, err := images.Create([]byte(`{"rootFS": {}}`))
+ assert.NilError(t, err)
+ defer images.Delete(id)
+
+ leaseID := imageKey(digest.Digest(id))
+ _, err = images.leases.Create(ctx, leases.WithID(leaseID))
+ assert.NilError(t, err)
+ defer images.leases.Delete(ctx, leases.Lease{ID: leaseID})
+
+ ls, err := images.leases.List(ctx)
+ assert.NilError(t, err)
+ assert.Check(t, cmp.Equal(len(ls), 1), ls)
+
+ _, err = images.Delete(id)
+ assert.NilError(t, err)
+
+ ls, err = images.leases.List(ctx)
+ assert.NilError(t, err)
+ assert.Check(t, cmp.Equal(len(ls), 0), ls)
+ })
+}
+
+func TestContentStoreForPull(t *testing.T) {
+ ctx, cs, is, cleanup := setupTestStores(t)
+ defer cleanup(t)
+
+ csP := &contentStoreForPull{
+ ContentStore: cs,
+ leases: is.leases,
+ }
+
+ data := []byte(`{}`)
+ desc := v1.Descriptor{
+ Digest: digest.Canonical.FromBytes(data),
+ Size: int64(len(data)),
+ }
+
+ w, err := csP.Writer(ctx, content.WithRef(t.Name()), content.WithDescriptor(desc))
+ assert.NilError(t, err)
+
+ _, err = w.Write(data)
+ assert.NilError(t, err)
+ defer w.Close()
+
+ err = w.Commit(ctx, desc.Size, desc.Digest)
+ assert.NilError(t, err)
+
+ assert.Equal(t, len(csP.digested), 1)
+ assert.Check(t, cmp.Equal(csP.digested[0], desc.Digest))
+
+ // Test already exists
+ csP.digested = nil
+ _, err = csP.Writer(ctx, content.WithRef(t.Name()), content.WithDescriptor(desc))
+ assert.Check(t, c8derrdefs.IsAlreadyExists(err))
+ assert.Equal(t, len(csP.digested), 1)
+ assert.Check(t, cmp.Equal(csP.digested[0], desc.Digest))
+}
diff --git a/distribution/config.go b/distribution/config.go
index f96df7b..2e3dd7b 100644
--- a/distribution/config.go
+++ b/distribution/config.go
@@ -84,8 +84,8 @@
// by digest. Allows getting an image configurations rootfs from the
// configuration.
type ImageConfigStore interface {
- Put([]byte) (digest.Digest, error)
- Get(digest.Digest) ([]byte, error)
+ Put(context.Context, []byte) (digest.Digest, error)
+ Get(context.Context, digest.Digest) ([]byte, error)
RootFSFromConfig([]byte) (*image.RootFS, error)
PlatformFromConfig([]byte) (*specs.Platform, error)
}
@@ -128,12 +128,12 @@
}
}
-func (s *imageConfigStore) Put(c []byte) (digest.Digest, error) {
+func (s *imageConfigStore) Put(_ context.Context, c []byte) (digest.Digest, error) {
id, err := s.Store.Create(c)
return digest.Digest(id), err
}
-func (s *imageConfigStore) Get(d digest.Digest) ([]byte, error) {
+func (s *imageConfigStore) Get(_ context.Context, d digest.Digest) ([]byte, error) {
img, err := s.Store.Get(image.IDFromDigest(d))
if err != nil {
return nil, err
diff --git a/distribution/manifest.go b/distribution/manifest.go
new file mode 100644
index 0000000..a97373b
--- /dev/null
+++ b/distribution/manifest.go
@@ -0,0 +1,195 @@
+package distribution
+
+import (
+ "context"
+ "encoding/json"
+ "io"
+ "io/ioutil"
+
+ "github.com/containerd/containerd/content"
+ "github.com/containerd/containerd/errdefs"
+ "github.com/containerd/containerd/log"
+ "github.com/containerd/containerd/remotes"
+ "github.com/docker/distribution"
+ "github.com/docker/distribution/manifest/schema1"
+ digest "github.com/opencontainers/go-digest"
+ specs "github.com/opencontainers/image-spec/specs-go/v1"
+ "github.com/pkg/errors"
+)
+
+// This is used by manifestStore to pare down the requirements to implement a
+// full distribution.ManifestService, since `Get` is all we use here.
+type manifestGetter interface {
+ Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error)
+}
+
+type manifestStore struct {
+ local ContentStore
+ remote manifestGetter
+}
+
+// ContentStore is the interface used to persist registry blobs
+//
+// Currently this is only used to persist manifests and manifest lists.
+// It is exported because `distribution.Pull` takes one as an argument.
+type ContentStore interface {
+ content.Ingester
+ content.Provider
+ Info(ctx context.Context, dgst digest.Digest) (content.Info, error)
+ Abort(ctx context.Context, ref string) error
+}
+
+func (m *manifestStore) getLocal(ctx context.Context, desc specs.Descriptor) (distribution.Manifest, error) {
+ ra, err := m.local.ReaderAt(ctx, desc)
+ if err != nil {
+ return nil, errors.Wrap(err, "error getting content store reader")
+ }
+ defer ra.Close()
+
+ r := io.NewSectionReader(ra, 0, ra.Size())
+ data, err := ioutil.ReadAll(r)
+ if err != nil {
+ return nil, errors.Wrap(err, "error reading manifest from content store")
+ }
+
+ manifest, _, err := distribution.UnmarshalManifest(desc.MediaType, data)
+ if err != nil {
+ return nil, errors.Wrap(err, "error unmarshaling manifest from content store")
+ }
+ return manifest, nil
+}
+
+func (m *manifestStore) getMediaType(ctx context.Context, desc specs.Descriptor) (string, error) {
+ ra, err := m.local.ReaderAt(ctx, desc)
+ if err != nil {
+ return "", errors.Wrap(err, "error getting reader to detect media type")
+ }
+ defer ra.Close()
+
+ mt, err := detectManifestMediaType(ra)
+ if err != nil {
+ return "", errors.Wrap(err, "error detecting media type")
+ }
+ return mt, nil
+}
+
+func (m *manifestStore) Get(ctx context.Context, desc specs.Descriptor) (distribution.Manifest, error) {
+ l := log.G(ctx)
+
+ if desc.MediaType == "" {
+ // When pulling by digest we will not have the media type on the
+ // descriptor since we have not made a request to the registry yet
+ //
+ // We already have the digest, so we only lookup locally... by digest.
+ //
+ // Let's try to detect the media type so we can have a good ref key
+ // here. We may not even have the content locally, and this is fine, but
+ // if we do we should determine that.
+ mt, err := m.getMediaType(ctx, desc)
+ if err != nil && !errdefs.IsNotFound(err) {
+ l.WithError(err).Warn("Error looking up media type of content")
+ }
+ desc.MediaType = mt
+ }
+
+ key := remotes.MakeRefKey(ctx, desc)
+
+ // Here we open a writer to the requested content. This both gives us a
+ // reference to write to if indeed we need to persist it and increments the
+ // ref count on the content.
+ w, err := m.local.Writer(ctx, content.WithDescriptor(desc), content.WithRef(key))
+ if err != nil {
+ if errdefs.IsAlreadyExists(err) {
+ var manifest distribution.Manifest
+ if manifest, err = m.getLocal(ctx, desc); err == nil {
+ return manifest, nil
+ }
+ }
+ // always fallback to the remote if there is an error with the local store
+ }
+ if w != nil {
+ defer w.Close()
+ }
+
+ l.WithError(err).Debug("Fetching manifest from remote")
+
+ manifest, err := m.remote.Get(ctx, desc.Digest)
+ if err != nil {
+ if err := m.local.Abort(ctx, key); err != nil {
+ l.WithError(err).Warn("Error while attempting to abort content ingest")
+ }
+ return nil, err
+ }
+
+ if w != nil {
+ // if `w` is nil here, something happened with the content store, so don't bother trying to persist.
+ if err := m.Put(ctx, manifest, desc, w); err != nil {
+ if err := m.local.Abort(ctx, key); err != nil {
+ l.WithError(err).Warn("error aborting content ingest")
+ }
+ l.WithError(err).Warn("Error persisting manifest")
+ }
+ }
+ return manifest, nil
+}
+
+func (m *manifestStore) Put(ctx context.Context, manifest distribution.Manifest, desc specs.Descriptor, w content.Writer) error {
+ mt, payload, err := manifest.Payload()
+ if err != nil {
+ return err
+ }
+ desc.Size = int64(len(payload))
+ desc.MediaType = mt
+
+ if _, err = w.Write(payload); err != nil {
+ return errors.Wrap(err, "error writing manifest to content store")
+ }
+
+ if err := w.Commit(ctx, desc.Size, desc.Digest); err != nil {
+ return errors.Wrap(err, "error committing manifest to content store")
+ }
+ return nil
+}
+
+func detectManifestMediaType(ra content.ReaderAt) (string, error) {
+ dt := make([]byte, ra.Size())
+ if _, err := ra.ReadAt(dt, 0); err != nil {
+ return "", err
+ }
+
+ return detectManifestBlobMediaType(dt)
+}
+
+// This is used when the manifest store does not know the media type of a sha it
+// was told to get. This would currently only happen when pulling by digest.
+// The media type is needed so the blob can be unmarshalled properly.
+func detectManifestBlobMediaType(dt []byte) (string, error) {
+ var mfst struct {
+ MediaType string `json:"mediaType"`
+ Config json.RawMessage `json:"config"` // schema2 Manifest
+ FSLayers json.RawMessage `json:"fsLayers"` // schema1 Manifest
+ }
+
+ if err := json.Unmarshal(dt, &mfst); err != nil {
+ return "", err
+ }
+
+ // We may have a media type specified in the json, in which case that should be used.
+ // Docker types should generally have a media type set.
+ // OCI (golang) types do not have a `mediaType` defined, and it is optional in the spec.
+ //
+ // `distrubtion.UnmarshalManifest`, which is used to unmarshal this for real, checks these media type values.
+ // If the specified media type does not match it will error, and in some cases (docker media types) it is required.
+ // So pretty much if we don't have a media type we can fall back to OCI.
+ // This does have a special fallback for schema1 manifests just because it is easy to detect.
+ switch {
+ case mfst.MediaType != "":
+ return mfst.MediaType, nil
+ case mfst.FSLayers != nil:
+ return schema1.MediaTypeManifest, nil
+ case mfst.Config != nil:
+ return specs.MediaTypeImageManifest, nil
+ default:
+ return specs.MediaTypeImageIndex, nil
+ }
+}
diff --git a/distribution/manifest_test.go b/distribution/manifest_test.go
new file mode 100644
index 0000000..0976a71
--- /dev/null
+++ b/distribution/manifest_test.go
@@ -0,0 +1,351 @@
+package distribution
+
+import (
+ "context"
+ "encoding/json"
+ "io/ioutil"
+ "os"
+ "strings"
+ "sync"
+ "testing"
+
+ "github.com/containerd/containerd/content"
+ "github.com/containerd/containerd/content/local"
+ "github.com/containerd/containerd/errdefs"
+ "github.com/containerd/containerd/remotes"
+ "github.com/docker/distribution"
+ "github.com/docker/distribution/manifest/ocischema"
+ "github.com/docker/distribution/manifest/schema1"
+ "github.com/google/go-cmp/cmp/cmpopts"
+ digest "github.com/opencontainers/go-digest"
+ specs "github.com/opencontainers/image-spec/specs-go/v1"
+ "github.com/pkg/errors"
+ "gotest.tools/v3/assert"
+ "gotest.tools/v3/assert/cmp"
+)
+
+type mockManifestGetter struct {
+ manifests map[digest.Digest]distribution.Manifest
+ gets int
+}
+
+func (m *mockManifestGetter) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
+ m.gets++
+ manifest, ok := m.manifests[dgst]
+ if !ok {
+ return nil, distribution.ErrManifestUnknown{Tag: dgst.String()}
+ }
+ return manifest, nil
+}
+
+type memoryLabelStore struct {
+ mu sync.Mutex
+ labels map[digest.Digest]map[string]string
+}
+
+// Get returns all the labels for the given digest
+func (s *memoryLabelStore) Get(dgst digest.Digest) (map[string]string, error) {
+ s.mu.Lock()
+ labels := s.labels[dgst]
+ s.mu.Unlock()
+ return labels, nil
+}
+
+// Set sets all the labels for a given digest
+func (s *memoryLabelStore) Set(dgst digest.Digest, labels map[string]string) error {
+ s.mu.Lock()
+ if s.labels == nil {
+ s.labels = make(map[digest.Digest]map[string]string)
+ }
+ s.labels[dgst] = labels
+ s.mu.Unlock()
+ return nil
+}
+
+// Update replaces the given labels for a digest,
+// a key with an empty value removes a label.
+func (s *memoryLabelStore) Update(dgst digest.Digest, update map[string]string) (map[string]string, error) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ labels, ok := s.labels[dgst]
+ if !ok {
+ labels = map[string]string{}
+ }
+ for k, v := range update {
+ labels[k] = v
+ }
+
+ s.labels[dgst] = labels
+
+ return labels, nil
+}
+
+type testingContentStoreWrapper struct {
+ ContentStore
+ errorOnWriter error
+ errorOnCommit error
+}
+
+func (s *testingContentStoreWrapper) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
+ if s.errorOnWriter != nil {
+ return nil, s.errorOnWriter
+ }
+
+ w, err := s.ContentStore.Writer(ctx, opts...)
+ if err != nil {
+ return nil, err
+ }
+
+ if s.errorOnCommit != nil {
+ w = &testingContentWriterWrapper{w, s.errorOnCommit}
+ }
+ return w, nil
+}
+
+type testingContentWriterWrapper struct {
+ content.Writer
+ err error
+}
+
+func (w *testingContentWriterWrapper) Commit(ctx context.Context, size int64, dgst digest.Digest, opts ...content.Opt) error {
+ if w.err != nil {
+ // The contract for `Commit` is to always close.
+ // Since this is returning early before hitting the real `Commit`, we should close it here.
+ w.Close()
+ return w.err
+ }
+ return w.Writer.Commit(ctx, size, dgst, opts...)
+}
+
+func TestManifestStore(t *testing.T) {
+ ociManifest := &specs.Manifest{}
+ serialized, err := json.Marshal(ociManifest)
+ assert.NilError(t, err)
+ dgst := digest.Canonical.FromBytes(serialized)
+
+ setupTest := func(t *testing.T) (specs.Descriptor, *mockManifestGetter, *manifestStore, content.Store, func(*testing.T)) {
+ root, err := ioutil.TempDir("", strings.Replace(t.Name(), "/", "_", -1))
+ assert.NilError(t, err)
+ defer func() {
+ if t.Failed() {
+ os.RemoveAll(root)
+ }
+ }()
+
+ cs, err := local.NewLabeledStore(root, &memoryLabelStore{})
+ assert.NilError(t, err)
+
+ mg := &mockManifestGetter{manifests: make(map[digest.Digest]distribution.Manifest)}
+ store := &manifestStore{local: cs, remote: mg}
+ desc := specs.Descriptor{Digest: dgst, MediaType: specs.MediaTypeImageManifest, Size: int64(len(serialized))}
+
+ return desc, mg, store, cs, func(t *testing.T) {
+ assert.Check(t, os.RemoveAll(root))
+ }
+ }
+
+ ctx := context.Background()
+
+ m, _, err := distribution.UnmarshalManifest(specs.MediaTypeImageManifest, serialized)
+ assert.NilError(t, err)
+
+ writeManifest := func(t *testing.T, cs ContentStore, desc specs.Descriptor, opts ...content.Opt) {
+ ingestKey := remotes.MakeRefKey(ctx, desc)
+ w, err := cs.Writer(ctx, content.WithDescriptor(desc), content.WithRef(ingestKey))
+ assert.NilError(t, err)
+ defer func() {
+ if err := w.Close(); err != nil {
+ t.Log(err)
+ }
+ if t.Failed() {
+ if err := cs.Abort(ctx, ingestKey); err != nil {
+ t.Log(err)
+ }
+ }
+ }()
+
+ _, err = w.Write(serialized)
+ assert.NilError(t, err)
+
+ err = w.Commit(ctx, desc.Size, desc.Digest, opts...)
+ assert.NilError(t, err)
+
+ }
+
+ // All tests should end up with no active ingest
+ checkIngest := func(t *testing.T, cs content.Store, desc specs.Descriptor) {
+ ingestKey := remotes.MakeRefKey(ctx, desc)
+ _, err := cs.Status(ctx, ingestKey)
+ assert.Check(t, errdefs.IsNotFound(err), err)
+ }
+
+ t.Run("no remote or local", func(t *testing.T) {
+ desc, _, store, cs, teardown := setupTest(t)
+ defer teardown(t)
+
+ _, err = store.Get(ctx, desc)
+ checkIngest(t, cs, desc)
+ // This error is what our digest getter returns when it doesn't know about the manifest
+ assert.Error(t, err, distribution.ErrManifestUnknown{Tag: dgst.String()}.Error())
+ })
+
+ t.Run("no local cache", func(t *testing.T) {
+ desc, mg, store, cs, teardown := setupTest(t)
+ defer teardown(t)
+
+ mg.manifests[desc.Digest] = m
+
+ m2, err := store.Get(ctx, desc)
+ checkIngest(t, cs, desc)
+ assert.NilError(t, err)
+ assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
+ assert.Check(t, cmp.Equal(mg.gets, 1))
+
+ i, err := cs.Info(ctx, desc.Digest)
+ assert.NilError(t, err)
+ assert.Check(t, cmp.Equal(i.Digest, desc.Digest))
+
+ // Now check again, this should not hit the remote
+ m2, err = store.Get(ctx, desc)
+ checkIngest(t, cs, desc)
+ assert.NilError(t, err)
+ assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
+ assert.Check(t, cmp.Equal(mg.gets, 1))
+ })
+
+ t.Run("with local cache", func(t *testing.T) {
+ desc, mg, store, cs, teardown := setupTest(t)
+ defer teardown(t)
+
+ // first add the manifest to the coontent store
+ writeManifest(t, cs, desc)
+
+ // now do the get
+ m2, err := store.Get(ctx, desc)
+ checkIngest(t, cs, desc)
+ assert.NilError(t, err)
+ assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
+ assert.Check(t, cmp.Equal(mg.gets, 0))
+
+ i, err := cs.Info(ctx, desc.Digest)
+ assert.NilError(t, err)
+ assert.Check(t, cmp.Equal(i.Digest, desc.Digest))
+ })
+
+ // This is for the case of pull by digest where we don't know the media type of the manifest until it's actually pulled.
+ t.Run("unknown media type", func(t *testing.T) {
+ t.Run("no cache", func(t *testing.T) {
+ desc, mg, store, cs, teardown := setupTest(t)
+ defer teardown(t)
+
+ mg.manifests[desc.Digest] = m
+ desc.MediaType = ""
+
+ m2, err := store.Get(ctx, desc)
+ checkIngest(t, cs, desc)
+ assert.NilError(t, err)
+ assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
+ assert.Check(t, cmp.Equal(mg.gets, 1))
+ })
+
+ t.Run("with cache", func(t *testing.T) {
+ t.Run("cached manifest has media type", func(t *testing.T) {
+ desc, mg, store, cs, teardown := setupTest(t)
+ defer teardown(t)
+
+ writeManifest(t, cs, desc)
+ desc.MediaType = ""
+
+ m2, err := store.Get(ctx, desc)
+ checkIngest(t, cs, desc)
+ assert.NilError(t, err)
+ assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
+ assert.Check(t, cmp.Equal(mg.gets, 0))
+ })
+
+ t.Run("cached manifest has no media type", func(t *testing.T) {
+ desc, mg, store, cs, teardown := setupTest(t)
+ defer teardown(t)
+
+ desc.MediaType = ""
+ writeManifest(t, cs, desc)
+
+ m2, err := store.Get(ctx, desc)
+ checkIngest(t, cs, desc)
+ assert.NilError(t, err)
+ assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
+ assert.Check(t, cmp.Equal(mg.gets, 0))
+ })
+ })
+ })
+
+ // Test that if there is an error with the content store, for whatever
+ // reason, that doesn't stop us from getting the manifest.
+ //
+ // Also makes sure the ingests are aborted.
+ t.Run("error persisting manifest", func(t *testing.T) {
+ t.Run("error on writer", func(t *testing.T) {
+ desc, mg, store, cs, teardown := setupTest(t)
+ defer teardown(t)
+ mg.manifests[desc.Digest] = m
+
+ csW := &testingContentStoreWrapper{ContentStore: store.local, errorOnWriter: errors.New("random error")}
+ store.local = csW
+
+ m2, err := store.Get(ctx, desc)
+ checkIngest(t, cs, desc)
+ assert.NilError(t, err)
+ assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
+ assert.Check(t, cmp.Equal(mg.gets, 1))
+
+ _, err = cs.Info(ctx, desc.Digest)
+ // Nothing here since we couldn't persist
+ assert.Check(t, errdefs.IsNotFound(err), err)
+ })
+
+ t.Run("error on commit", func(t *testing.T) {
+ desc, mg, store, cs, teardown := setupTest(t)
+ defer teardown(t)
+ mg.manifests[desc.Digest] = m
+
+ csW := &testingContentStoreWrapper{ContentStore: store.local, errorOnCommit: errors.New("random error")}
+ store.local = csW
+
+ m2, err := store.Get(ctx, desc)
+ checkIngest(t, cs, desc)
+ assert.NilError(t, err)
+ assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
+ assert.Check(t, cmp.Equal(mg.gets, 1))
+
+ _, err = cs.Info(ctx, desc.Digest)
+ // Nothing here since we couldn't persist
+ assert.Check(t, errdefs.IsNotFound(err), err)
+ })
+ })
+}
+
+func TestDetectManifestBlobMediaType(t *testing.T) {
+ type testCase struct {
+ json []byte
+ expected string
+ }
+ cases := map[string]testCase{
+ "mediaType is set": {[]byte(`{"mediaType": "bananas"}`), "bananas"},
+ "oci manifest": {[]byte(`{"config": {}}`), specs.MediaTypeImageManifest},
+ "schema1": {[]byte(`{"fsLayers": []}`), schema1.MediaTypeManifest},
+ "oci index fallback": {[]byte(`{}`), specs.MediaTypeImageIndex},
+ // Make sure we prefer mediaType
+ "mediaType and config set": {[]byte(`{"mediaType": "bananas", "config": {}}`), "bananas"},
+ "mediaType and fsLayers set": {[]byte(`{"mediaType": "bananas", "fsLayers": []}`), "bananas"},
+ }
+
+ for name, tc := range cases {
+ t.Run(name, func(t *testing.T) {
+ mt, err := detectManifestBlobMediaType(tc.json)
+ assert.NilError(t, err)
+ assert.Equal(t, mt, tc.expected)
+ })
+ }
+
+}
diff --git a/distribution/pull.go b/distribution/pull.go
index ecf2f98..c8ddd4c 100644
--- a/distribution/pull.go
+++ b/distribution/pull.go
@@ -29,7 +29,7 @@
// whether a v1 or v2 puller will be created. The other parameters are passed
// through to the underlying puller implementation for use during the actual
// pull operation.
-func newPuller(endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePullConfig *ImagePullConfig) (Puller, error) {
+func newPuller(endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePullConfig *ImagePullConfig, local ContentStore) (Puller, error) {
switch endpoint.Version {
case registry.APIVersion2:
return &v2Puller{
@@ -37,6 +37,9 @@
endpoint: endpoint,
config: imagePullConfig,
repoInfo: repoInfo,
+ manifestStore: &manifestStore{
+ local: local,
+ },
}, nil
case registry.APIVersion1:
return nil, fmt.Errorf("protocol version %d no longer supported. Please contact admins of registry %s", endpoint.Version, endpoint.URL)
@@ -46,7 +49,7 @@
// Pull initiates a pull operation. image is the repository name to pull, and
// tag may be either empty, or indicate a specific tag to pull.
-func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullConfig) error {
+func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullConfig, local ContentStore) error {
// Resolve the Repository name from fqn to RepositoryInfo
repoInfo, err := imagePullConfig.RegistryService.ResolveRepository(ref)
if err != nil {
@@ -104,7 +107,7 @@
logrus.Debugf("Trying to pull %s from %s %s", reference.FamiliarName(repoInfo.Name), endpoint.URL, endpoint.Version)
- puller, err := newPuller(endpoint, repoInfo, imagePullConfig)
+ puller, err := newPuller(endpoint, repoInfo, imagePullConfig, local)
if err != nil {
lastErr = err
continue
diff --git a/distribution/pull_v2.go b/distribution/pull_v2.go
index 54b66d0..b5db4ee 100644
--- a/distribution/pull_v2.go
+++ b/distribution/pull_v2.go
@@ -11,6 +11,7 @@
"runtime"
"strings"
+ "github.com/containerd/containerd/log"
"github.com/containerd/containerd/platforms"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/manifestlist"
@@ -62,7 +63,8 @@
repo distribution.Repository
// confirmedV2 is set to true if we confirm we're talking to a v2
// registry. This is used to limit fallbacks to the v1 protocol.
- confirmedV2 bool
+ confirmedV2 bool
+ manifestStore *manifestStore
}
func (p *v2Puller) Pull(ctx context.Context, ref reference.Named, platform *specs.Platform) (err error) {
@@ -73,6 +75,11 @@
return err
}
+ p.manifestStore.remote, err = p.repo.Manifests(ctx)
+ if err != nil {
+ return err
+ }
+
if err = p.pullV2Repository(ctx, ref, platform); err != nil {
if _, ok := err.(fallbackError); ok {
return err
@@ -330,31 +337,45 @@
}
func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named, platform *specs.Platform) (tagUpdated bool, err error) {
- manSvc, err := p.repo.Manifests(ctx)
- if err != nil {
- return false, err
- }
var (
- manifest distribution.Manifest
tagOrDigest string // Used for logging/progress only
+ dgst digest.Digest
+ mt string
+ size int64
)
if digested, isDigested := ref.(reference.Canonical); isDigested {
- manifest, err = manSvc.Get(ctx, digested.Digest())
- if err != nil {
- return false, err
- }
- tagOrDigest = digested.Digest().String()
+ dgst = digested.Digest()
+ tagOrDigest = digested.String()
} else if tagged, isTagged := ref.(reference.NamedTagged); isTagged {
- manifest, err = manSvc.Get(ctx, "", distribution.WithTag(tagged.Tag()))
+ tagService := p.repo.Tags(ctx)
+ desc, err := tagService.Get(ctx, tagged.Tag())
if err != nil {
return false, allowV1Fallback(err)
}
+ dgst = desc.Digest
tagOrDigest = tagged.Tag()
+ mt = desc.MediaType
+ size = desc.Size
} else {
return false, fmt.Errorf("internal error: reference has neither a tag nor a digest: %s", reference.FamiliarString(ref))
}
+ ctx = log.WithLogger(ctx, logrus.WithFields(
+ logrus.Fields{
+ "digest": dgst,
+ "remote": ref,
+ }))
+
+ manifest, err := p.manifestStore.Get(ctx, specs.Descriptor{
+ MediaType: mt,
+ Digest: dgst,
+ Size: size,
+ })
+ if err != nil {
+ return false, err
+ }
+
if manifest == nil {
return false, fmt.Errorf("image manifest does not exist for tag or digest %q", tagOrDigest)
}
@@ -553,7 +574,7 @@
return "", "", err
}
- imageID, err := p.config.ImageStore.Put(config)
+ imageID, err := p.config.ImageStore.Put(ctx, config)
if err != nil {
return "", "", err
}
@@ -564,7 +585,7 @@
}
func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.Descriptor, layers []distribution.Descriptor, platform *specs.Platform) (id digest.Digest, err error) {
- if _, err := p.config.ImageStore.Get(target.Digest); err == nil {
+ if _, err := p.config.ImageStore.Get(ctx, target.Digest); err == nil {
// If the image already exists locally, no need to pull
// anything.
return target.Digest, nil
@@ -721,7 +742,7 @@
}
}
- imageID, err := p.config.ImageStore.Put(configJSON)
+ imageID, err := p.config.ImageStore.Put(ctx, configJSON)
if err != nil {
return "", err
}
@@ -791,23 +812,22 @@
if len(manifestMatches) > 1 {
logrus.Debugf("found multiple matches in manifest list, choosing best match %s", manifestMatches[0].Digest.String())
}
- manifestDigest := manifestMatches[0].Digest
+ match := manifestMatches[0]
- if err := checkImageCompatibility(manifestMatches[0].Platform.OS, manifestMatches[0].Platform.OSVersion); err != nil {
+ if err := checkImageCompatibility(match.Platform.OS, match.Platform.OSVersion); err != nil {
return "", "", err
}
- manSvc, err := p.repo.Manifests(ctx)
+ manifest, err := p.manifestStore.Get(ctx, specs.Descriptor{
+ Digest: match.Digest,
+ Size: match.Size,
+ MediaType: match.MediaType,
+ })
if err != nil {
return "", "", err
}
- manifest, err := manSvc.Get(ctx, manifestDigest)
- if err != nil {
- return "", "", err
- }
-
- manifestRef, err := reference.WithDigest(reference.TrimNamed(ref), manifestDigest)
+ manifestRef, err := reference.WithDigest(reference.TrimNamed(ref), match.Digest)
if err != nil {
return "", "", err
}
diff --git a/distribution/push_v2.go b/distribution/push_v2.go
index b64230f..764149c 100644
--- a/distribution/push_v2.go
+++ b/distribution/push_v2.go
@@ -116,7 +116,7 @@
func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error {
logrus.Debugf("Pushing repository: %s", reference.FamiliarString(ref))
- imgConfig, err := p.config.ImageStore.Get(id)
+ imgConfig, err := p.config.ImageStore.Get(ctx, id)
if err != nil {
return fmt.Errorf("could not find image from tag %s: %v", reference.FamiliarString(ref), err)
}
diff --git a/distribution/registry_unit_test.go b/distribution/registry_unit_test.go
index 3651c46..60fbeb9 100644
--- a/distribution/registry_unit_test.go
+++ b/distribution/registry_unit_test.go
@@ -69,7 +69,7 @@
},
Schema2Types: ImageTypes,
}
- puller, err := newPuller(endpoint, repoInfo, imagePullConfig)
+ puller, err := newPuller(endpoint, repoInfo, imagePullConfig, nil)
if err != nil {
t.Fatal(err)
}