blob: ae3814761d312de60320463ecddcf61e03c9376b [file] [log] [blame]
package containerd
import (
"context"
"fmt"
"io"
"strings"
"sync"
"github.com/containerd/containerd/content"
cerrdefs "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
containerdimages "github.com/containerd/containerd/images"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/docker/distribution/reference"
"github.com/docker/docker/api/types/registry"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/pkg/streamformatter"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)
// PushImage initiates a push operation of the image pointed to by targetRef.
// Image manifest (or index) is pushed as is, which will probably fail if you
// don't have all content referenced by the index.
// Cross-repo mounts will be attempted for non-existing blobs.
//
// It will also add distribution source labels to the pushed content
// pointing to the new target repository. This will allow subsequent pushes
// to perform cross-repo mounts of the shared content when pushing to a different
// repository on the same registry.
func (i *ImageService) PushImage(ctx context.Context, targetRef reference.Named, metaHeaders map[string][]string, authConfig *registry.AuthConfig, outStream io.Writer) error {
if _, tagged := targetRef.(reference.Tagged); !tagged {
if _, digested := targetRef.(reference.Digested); !digested {
return errdefs.NotImplemented(errors.New("push all tags is not implemented"))
}
}
leasedCtx, release, err := i.client.WithLease(ctx)
if err != nil {
return err
}
defer func() {
err := release(leasedCtx)
if err != nil && !cerrdefs.IsNotFound(err) {
logrus.WithField("image", targetRef).WithError(err).Error("failed to delete lease created for push")
}
}()
out := streamformatter.NewJSONProgressOutput(outStream, false)
img, err := i.client.ImageService().Get(ctx, targetRef.String())
if err != nil {
return errdefs.NotFound(err)
}
target := img.Target
store := i.client.ContentStore()
resolver, tracker := i.newResolverFromAuthConfig(authConfig)
progress := pushProgress{Tracker: tracker}
jobsQueue := newJobs()
finishProgress := jobsQueue.showProgress(ctx, out, combinedProgress([]progressUpdater{
&progress,
pullProgress{ShowExists: false, Store: store},
}))
defer finishProgress()
var limiter *semaphore.Weighted = nil // TODO: Respect max concurrent downloads/uploads
mountableBlobs, err := findMissingMountable(ctx, store, jobsQueue, target, targetRef, limiter)
if err != nil {
return err
}
for dgst := range mountableBlobs {
progress.addMountable(dgst)
}
// Create a store which fakes the local existence of possibly mountable blobs.
// Otherwise they can't be pushed at all.
realStore := store
wrapped := wrapWithFakeMountableBlobs(store, mountableBlobs)
store = wrapped
pusher, err := resolver.Pusher(ctx, targetRef.String())
if err != nil {
return err
}
addChildrenToJobs := containerdimages.HandlerFunc(
func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
children, err := containerdimages.Children(ctx, store, desc)
if err != nil {
return nil, err
}
for _, c := range children {
jobsQueue.Add(c)
}
jobsQueue.Add(desc)
return nil, nil
},
)
handlerWrapper := func(h images.Handler) images.Handler {
return containerdimages.Handlers(addChildrenToJobs, h)
}
err = remotes.PushContent(ctx, pusher, target, store, limiter, platforms.All, handlerWrapper)
if err != nil {
if containerdimages.IsIndexType(target.MediaType) {
if cerrdefs.IsNotFound(err) {
err = errdefs.NotFound(fmt.Errorf(
"missing content: %w\n"+
"Note: You're trying to push a manifest list/index which "+
"references multiple platform specific manifests, but not all of them are available locally "+
"or available to the remote repository.\n"+
"Make sure you have all the referenced content and try again.",
err))
}
}
} else {
appendSource, err := docker.AppendDistributionSourceLabel(realStore, targetRef.String())
if err != nil {
// This shouldn't happen at this point because the reference would have to be invalid
// and if it was, then it would error out earlier.
return errdefs.Unknown(errors.Wrap(err, "failed to create an handler that appends distribution source label to pushed content"))
}
if err := containerdimages.Dispatch(ctx, appendSource, nil, target); err != nil {
// Shouldn't happen, but even if it would fail, then make it only a warning
// because it doesn't affect the pushed data.
logrus.WithError(err).Warn("failed to append distribution source labels to pushed content")
}
}
return err
}
// findMissingMountable will walk the target descriptor recursively and return
// missing contents with their distribution source which could potentially
// be cross-repo mounted.
func findMissingMountable(ctx context.Context, store content.Store, queue *jobs,
target ocispec.Descriptor, targetRef reference.Named, limiter *semaphore.Weighted,
) (map[digest.Digest]distributionSource, error) {
mountableBlobs := map[digest.Digest]distributionSource{}
var mutex sync.Mutex
sources, err := getDigestSources(ctx, store, target.Digest)
if err != nil {
if !errdefs.IsNotFound(err) {
return nil, err
}
logrus.WithField("target", target).Debug("distribution source label not found")
return mountableBlobs, nil
}
handler := func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
_, err := store.Info(ctx, desc.Digest)
if err != nil {
if !cerrdefs.IsNotFound(err) {
return nil, errdefs.System(errors.Wrapf(err, "failed to get metadata of content %s", desc.Digest.String()))
}
for _, source := range sources {
if canBeMounted(desc.MediaType, targetRef, source) {
mutex.Lock()
mountableBlobs[desc.Digest] = source
mutex.Unlock()
queue.Add(desc)
break
}
}
return nil, nil
}
return containerdimages.Children(ctx, store, desc)
}
err = containerdimages.Dispatch(ctx, containerdimages.HandlerFunc(handler), limiter, target)
if err != nil {
return nil, err
}
return mountableBlobs, nil
}
func getDigestSources(ctx context.Context, store content.Manager, digest digest.Digest) ([]distributionSource, error) {
info, err := store.Info(ctx, digest)
if err != nil {
if cerrdefs.IsNotFound(err) {
return nil, errdefs.NotFound(err)
}
return nil, errdefs.System(err)
}
sources := extractDistributionSources(info.Labels)
if sources == nil {
return nil, errdefs.NotFound(fmt.Errorf("label %q is not attached to %s", labelDistributionSource, digest.String()))
}
return sources, nil
}
// TODO(vvoland): Remove and use containerd const in containerd 1.7+
// https://github.com/containerd/containerd/pull/8224
const labelDistributionSource = "containerd.io/distribution.source."
func extractDistributionSources(labels map[string]string) []distributionSource {
var sources []distributionSource
// Check if this blob has a distributionSource label
// if yes, read it as source
for k, v := range labels {
if reg := strings.TrimPrefix(k, labelDistributionSource); reg != k {
for _, repo := range strings.Split(v, ",") {
ref, err := reference.ParseNamed(reg + "/" + repo)
if err != nil {
continue
}
sources = append(sources, distributionSource{
registryRef: ref,
})
}
}
}
return sources
}
type distributionSource struct {
registryRef reference.Named
}
// ToAnnotation returns key and value
func (source distributionSource) ToAnnotation() (string, string) {
domain := reference.Domain(source.registryRef)
v := reference.Path(source.registryRef)
return labelDistributionSource + domain, v
}
func (source distributionSource) GetReference(dgst digest.Digest) (reference.Named, error) {
return reference.WithDigest(source.registryRef, dgst)
}
// canBeMounted returns if the content with given media type can be cross-repo
// mounted when pushing it to a remote reference ref.
func canBeMounted(mediaType string, targetRef reference.Named, source distributionSource) bool {
if containerdimages.IsManifestType(mediaType) {
return false
}
if containerdimages.IsIndexType(mediaType) {
return false
}
reg := reference.Domain(targetRef)
// Remove :port suffix from domain
// containerd distribution source label doesn't store port
if portIdx := strings.LastIndex(reg, ":"); portIdx != -1 {
reg = reg[:portIdx]
}
// If the source registry is the same as the one we are pushing to
// then the cross-repo mount will work.
return reg == reference.Domain(source.registryRef)
}