| /* |
| Copyright The containerd Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package containerd |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| |
| "github.com/containerd/containerd/errdefs" |
| "github.com/containerd/containerd/images" |
| "github.com/containerd/containerd/platforms" |
| "github.com/containerd/containerd/remotes" |
| "github.com/containerd/containerd/remotes/docker" |
| "github.com/containerd/containerd/remotes/docker/schema1" |
| ocispec "github.com/opencontainers/image-spec/specs-go/v1" |
| "golang.org/x/sync/errgroup" |
| "golang.org/x/sync/semaphore" |
| ) |
| |
| // Pull downloads the provided content into containerd's content store |
| // and returns a platform specific image object |
| func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (_ Image, retErr error) { |
| pullCtx := defaultRemoteContext() |
| for _, o := range opts { |
| if err := o(c, pullCtx); err != nil { |
| return nil, err |
| } |
| } |
| |
| if pullCtx.PlatformMatcher == nil { |
| if len(pullCtx.Platforms) > 1 { |
| return nil, errors.New("cannot pull multiplatform image locally, try Fetch") |
| } else if len(pullCtx.Platforms) == 0 { |
| pullCtx.PlatformMatcher = c.platform |
| } else { |
| p, err := platforms.Parse(pullCtx.Platforms[0]) |
| if err != nil { |
| return nil, fmt.Errorf("invalid platform %s: %w", pullCtx.Platforms[0], err) |
| } |
| |
| pullCtx.PlatformMatcher = platforms.Only(p) |
| } |
| } |
| |
| ctx, done, err := c.WithLease(ctx) |
| if err != nil { |
| return nil, err |
| } |
| defer done(ctx) |
| |
| var unpacks int32 |
| var unpackEg *errgroup.Group |
| var unpackWrapper func(f images.Handler) images.Handler |
| |
| if pullCtx.Unpack { |
| // unpacker only supports schema 2 image, for schema 1 this is noop. |
| u, err := c.newUnpacker(ctx, pullCtx) |
| if err != nil { |
| return nil, fmt.Errorf("create unpacker: %w", err) |
| } |
| unpackWrapper, unpackEg = u.handlerWrapper(ctx, pullCtx, &unpacks) |
| defer func() { |
| if err := unpackEg.Wait(); err != nil { |
| if retErr == nil { |
| retErr = fmt.Errorf("unpack: %w", err) |
| } |
| } |
| }() |
| wrapper := pullCtx.HandlerWrapper |
| pullCtx.HandlerWrapper = func(h images.Handler) images.Handler { |
| if wrapper == nil { |
| return unpackWrapper(h) |
| } |
| return unpackWrapper(wrapper(h)) |
| } |
| } |
| |
| img, err := c.fetch(ctx, pullCtx, ref, 1) |
| if err != nil { |
| return nil, err |
| } |
| |
| // NOTE(fuweid): unpacker defers blobs download. before create image |
| // record in ImageService, should wait for unpacking(including blobs |
| // download). |
| if pullCtx.Unpack { |
| if unpackEg != nil { |
| if err := unpackEg.Wait(); err != nil { |
| return nil, err |
| } |
| } |
| } |
| |
| img, err = c.createNewImage(ctx, img) |
| if err != nil { |
| return nil, err |
| } |
| |
| i := NewImageWithPlatform(c, img, pullCtx.PlatformMatcher) |
| |
| if pullCtx.Unpack { |
| if unpacks == 0 { |
| // Try to unpack is none is done previously. |
| // This is at least required for schema 1 image. |
| if err := i.Unpack(ctx, pullCtx.Snapshotter, pullCtx.UnpackOpts...); err != nil { |
| return nil, fmt.Errorf("failed to unpack image on snapshotter %s: %w", pullCtx.Snapshotter, err) |
| } |
| } |
| } |
| |
| return i, nil |
| } |
| |
| func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, limit int) (images.Image, error) { |
| store := c.ContentStore() |
| name, desc, err := rCtx.Resolver.Resolve(ctx, ref) |
| if err != nil { |
| return images.Image{}, fmt.Errorf("failed to resolve reference %q: %w", ref, err) |
| } |
| |
| fetcher, err := rCtx.Resolver.Fetcher(ctx, name) |
| if err != nil { |
| return images.Image{}, fmt.Errorf("failed to get fetcher for %q: %w", name, err) |
| } |
| |
| var ( |
| handler images.Handler |
| |
| isConvertible bool |
| converterFunc func(context.Context, ocispec.Descriptor) (ocispec.Descriptor, error) |
| limiter *semaphore.Weighted |
| ) |
| |
| if desc.MediaType == images.MediaTypeDockerSchema1Manifest && rCtx.ConvertSchema1 { |
| schema1Converter := schema1.NewConverter(store, fetcher) |
| |
| handler = images.Handlers(append(rCtx.BaseHandlers, schema1Converter)...) |
| |
| isConvertible = true |
| |
| converterFunc = func(ctx context.Context, _ ocispec.Descriptor) (ocispec.Descriptor, error) { |
| return schema1Converter.Convert(ctx) |
| } |
| } else { |
| // Get all the children for a descriptor |
| childrenHandler := images.ChildrenHandler(store) |
| // Set any children labels for that content |
| childrenHandler = images.SetChildrenMappedLabels(store, childrenHandler, rCtx.ChildLabelMap) |
| if rCtx.AllMetadata { |
| // Filter manifests by platforms but allow to handle manifest |
| // and configuration for not-target platforms |
| childrenHandler = remotes.FilterManifestByPlatformHandler(childrenHandler, rCtx.PlatformMatcher) |
| } else { |
| // Filter children by platforms if specified. |
| childrenHandler = images.FilterPlatforms(childrenHandler, rCtx.PlatformMatcher) |
| } |
| // Sort and limit manifests if a finite number is needed |
| if limit > 0 { |
| childrenHandler = images.LimitManifests(childrenHandler, rCtx.PlatformMatcher, limit) |
| } |
| |
| // set isConvertible to true if there is application/octet-stream media type |
| convertibleHandler := images.HandlerFunc( |
| func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { |
| if desc.MediaType == docker.LegacyConfigMediaType { |
| isConvertible = true |
| } |
| |
| return []ocispec.Descriptor{}, nil |
| }, |
| ) |
| |
| appendDistSrcLabelHandler, err := docker.AppendDistributionSourceLabel(store, ref) |
| if err != nil { |
| return images.Image{}, err |
| } |
| |
| handlers := append(rCtx.BaseHandlers, |
| remotes.FetchHandler(store, fetcher), |
| convertibleHandler, |
| childrenHandler, |
| appendDistSrcLabelHandler, |
| ) |
| |
| handler = images.Handlers(handlers...) |
| |
| converterFunc = func(ctx context.Context, desc ocispec.Descriptor) (ocispec.Descriptor, error) { |
| return docker.ConvertManifest(ctx, store, desc) |
| } |
| } |
| |
| if rCtx.HandlerWrapper != nil { |
| handler = rCtx.HandlerWrapper(handler) |
| } |
| |
| if rCtx.MaxConcurrentDownloads > 0 { |
| limiter = semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads)) |
| } |
| |
| if err := images.Dispatch(ctx, handler, limiter, desc); err != nil { |
| return images.Image{}, err |
| } |
| |
| if isConvertible { |
| if desc, err = converterFunc(ctx, desc); err != nil { |
| return images.Image{}, err |
| } |
| } |
| |
| return images.Image{ |
| Name: name, |
| Target: desc, |
| Labels: rCtx.Labels, |
| }, nil |
| } |
| |
| func (c *Client) createNewImage(ctx context.Context, img images.Image) (images.Image, error) { |
| is := c.ImageService() |
| for { |
| if created, err := is.Create(ctx, img); err != nil { |
| if !errdefs.IsAlreadyExists(err) { |
| return images.Image{}, err |
| } |
| |
| updated, err := is.Update(ctx, img) |
| if err != nil { |
| // if image was removed, try create again |
| if errdefs.IsNotFound(err) { |
| continue |
| } |
| return images.Image{}, err |
| } |
| |
| img = updated |
| } else { |
| img = created |
| } |
| |
| return img, nil |
| } |
| } |