| package graph |
| |
| import ( |
| "errors" |
| "fmt" |
| "net" |
| "net/url" |
| "strings" |
| "time" |
| |
| "github.com/Sirupsen/logrus" |
| "github.com/docker/distribution/registry/client/transport" |
| "github.com/docker/docker/image" |
| "github.com/docker/docker/pkg/progressreader" |
| "github.com/docker/docker/pkg/streamformatter" |
| "github.com/docker/docker/pkg/stringid" |
| "github.com/docker/docker/registry" |
| "github.com/docker/docker/utils" |
| ) |
| |
| type v1Puller struct { |
| *TagStore |
| endpoint registry.APIEndpoint |
| config *ImagePullConfig |
| sf *streamformatter.StreamFormatter |
| repoInfo *registry.RepositoryInfo |
| session *registry.Session |
| } |
| |
| func (p *v1Puller) Pull(tag string) (fallback bool, err error) { |
| if utils.DigestReference(tag) { |
| // Allowing fallback, because HTTPS v1 is before HTTP v2 |
| return true, registry.ErrNoSupport{errors.New("Cannot pull by digest with v1 registry")} |
| } |
| |
| tlsConfig, err := p.registryService.TLSConfig(p.repoInfo.Index.Name) |
| if err != nil { |
| return false, err |
| } |
| // Adds Docker-specific headers as well as user-specified headers (metaHeaders) |
| tr := transport.NewTransport( |
| // TODO(tiborvass): was ReceiveTimeout |
| registry.NewTransport(tlsConfig), |
| registry.DockerHeaders(p.config.MetaHeaders)..., |
| ) |
| client := registry.HTTPClient(tr) |
| v1Endpoint, err := p.endpoint.ToV1Endpoint(p.config.MetaHeaders) |
| if err != nil { |
| logrus.Debugf("Could not get v1 endpoint: %v", err) |
| return true, err |
| } |
| p.session, err = registry.NewSession(client, p.config.AuthConfig, v1Endpoint) |
| if err != nil { |
| // TODO(dmcgowan): Check if should fallback |
| logrus.Debugf("Fallback from error: %s", err) |
| return true, err |
| } |
| if err := p.pullRepository(tag); err != nil { |
| // TODO(dmcgowan): Check if should fallback |
| return false, err |
| } |
| return false, nil |
| } |
| |
| func (p *v1Puller) pullRepository(askedTag string) error { |
| out := p.config.OutStream |
| out.Write(p.sf.FormatStatus("", "Pulling repository %s", p.repoInfo.CanonicalName)) |
| |
| repoData, err := p.session.GetRepositoryData(p.repoInfo.RemoteName) |
| if err != nil { |
| if strings.Contains(err.Error(), "HTTP code: 404") { |
| return fmt.Errorf("Error: image %s not found", utils.ImageReference(p.repoInfo.RemoteName, askedTag)) |
| } |
| // Unexpected HTTP error |
| return err |
| } |
| |
| logrus.Debugf("Retrieving the tag list") |
| tagsList := make(map[string]string) |
| if askedTag == "" { |
| tagsList, err = p.session.GetRemoteTags(repoData.Endpoints, p.repoInfo.RemoteName) |
| } else { |
| var tagId string |
| tagId, err = p.session.GetRemoteTag(repoData.Endpoints, p.repoInfo.RemoteName, askedTag) |
| tagsList[askedTag] = tagId |
| } |
| if err != nil { |
| if err == registry.ErrRepoNotFound && askedTag != "" { |
| return fmt.Errorf("Tag %s not found in repository %s", askedTag, p.repoInfo.CanonicalName) |
| } |
| logrus.Errorf("unable to get remote tags: %s", err) |
| return err |
| } |
| |
| for tag, id := range tagsList { |
| repoData.ImgList[id] = ®istry.ImgData{ |
| ID: id, |
| Tag: tag, |
| Checksum: "", |
| } |
| } |
| |
| logrus.Debugf("Registering tags") |
| // If no tag has been specified, pull them all |
| if askedTag == "" { |
| for tag, id := range tagsList { |
| repoData.ImgList[id].Tag = tag |
| } |
| } else { |
| // Otherwise, check that the tag exists and use only that one |
| id, exists := tagsList[askedTag] |
| if !exists { |
| return fmt.Errorf("Tag %s not found in repository %s", askedTag, p.repoInfo.CanonicalName) |
| } |
| repoData.ImgList[id].Tag = askedTag |
| } |
| |
| errors := make(chan error) |
| |
| layersDownloaded := false |
| imgIDs := []string{} |
| sessionID := p.session.ID() |
| defer func() { |
| p.graph.Release(sessionID, imgIDs...) |
| }() |
| for _, image := range repoData.ImgList { |
| downloadImage := func(img *registry.ImgData) { |
| if askedTag != "" && img.Tag != askedTag { |
| errors <- nil |
| return |
| } |
| |
| if img.Tag == "" { |
| logrus.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID) |
| errors <- nil |
| return |
| } |
| |
| // ensure no two downloads of the same image happen at the same time |
| if c, err := p.poolAdd("pull", "img:"+img.ID); err != nil { |
| if c != nil { |
| out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Layer already being pulled by another client. Waiting.", nil)) |
| <-c |
| out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil)) |
| } else { |
| logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", img.ID, err) |
| } |
| errors <- nil |
| return |
| } |
| defer p.poolRemove("pull", "img:"+img.ID) |
| |
| // we need to retain it until tagging |
| p.graph.Retain(sessionID, img.ID) |
| imgIDs = append(imgIDs, img.ID) |
| |
| out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s", img.Tag, p.repoInfo.CanonicalName), nil)) |
| success := false |
| var lastErr, err error |
| var isDownloaded bool |
| for _, ep := range p.repoInfo.Index.Mirrors { |
| ep += "v1/" |
| out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, mirror: %s", img.Tag, p.repoInfo.CanonicalName, ep), nil)) |
| if isDownloaded, err = p.pullImage(img.ID, ep, repoData.Tokens); err != nil { |
| // Don't report errors when pulling from mirrors. |
| logrus.Debugf("Error pulling image (%s) from %s, mirror: %s, %s", img.Tag, p.repoInfo.CanonicalName, ep, err) |
| continue |
| } |
| layersDownloaded = layersDownloaded || isDownloaded |
| success = true |
| break |
| } |
| if !success { |
| for _, ep := range repoData.Endpoints { |
| out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Pulling image (%s) from %s, endpoint: %s", img.Tag, p.repoInfo.CanonicalName, ep), nil)) |
| if isDownloaded, err = p.pullImage(img.ID, ep, repoData.Tokens); err != nil { |
| // It's not ideal that only the last error is returned, it would be better to concatenate the errors. |
| // As the error is also given to the output stream the user will see the error. |
| lastErr = err |
| out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), fmt.Sprintf("Error pulling image (%s) from %s, endpoint: %s, %s", img.Tag, p.repoInfo.CanonicalName, ep, err), nil)) |
| continue |
| } |
| layersDownloaded = layersDownloaded || isDownloaded |
| success = true |
| break |
| } |
| } |
| if !success { |
| err := fmt.Errorf("Error pulling image (%s) from %s, %v", img.Tag, p.repoInfo.CanonicalName, lastErr) |
| out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), err.Error(), nil)) |
| errors <- err |
| return |
| } |
| out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Download complete", nil)) |
| |
| errors <- nil |
| } |
| |
| go downloadImage(image) |
| } |
| |
| var lastError error |
| for i := 0; i < len(repoData.ImgList); i++ { |
| if err := <-errors; err != nil { |
| lastError = err |
| } |
| } |
| if lastError != nil { |
| return lastError |
| } |
| |
| for tag, id := range tagsList { |
| if askedTag != "" && tag != askedTag { |
| continue |
| } |
| if err := p.Tag(p.repoInfo.LocalName, tag, id, true); err != nil { |
| return err |
| } |
| } |
| |
| requestedTag := p.repoInfo.LocalName |
| if len(askedTag) > 0 { |
| requestedTag = utils.ImageReference(p.repoInfo.LocalName, askedTag) |
| } |
| WriteStatus(requestedTag, out, p.sf, layersDownloaded) |
| return nil |
| } |
| |
| func (p *v1Puller) pullImage(imgID, endpoint string, token []string) (bool, error) { |
| history, err := p.session.GetRemoteHistory(imgID, endpoint) |
| if err != nil { |
| return false, err |
| } |
| out := p.config.OutStream |
| out.Write(p.sf.FormatProgress(stringid.TruncateID(imgID), "Pulling dependent layers", nil)) |
| // FIXME: Try to stream the images? |
| // FIXME: Launch the getRemoteImage() in goroutines |
| |
| sessionID := p.session.ID() |
| // As imgID has been retained in pullRepository, no need to retain again |
| p.graph.Retain(sessionID, history[1:]...) |
| defer p.graph.Release(sessionID, history[1:]...) |
| |
| layersDownloaded := false |
| for i := len(history) - 1; i >= 0; i-- { |
| id := history[i] |
| |
| // ensure no two downloads of the same layer happen at the same time |
| if c, err := p.poolAdd("pull", "layer:"+id); err != nil { |
| logrus.Debugf("Image (id: %s) pull is already running, skipping: %v", id, err) |
| <-c |
| } |
| defer p.poolRemove("pull", "layer:"+id) |
| |
| if !p.graph.Exists(id) { |
| out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Pulling metadata", nil)) |
| var ( |
| imgJSON []byte |
| imgSize int |
| err error |
| img *image.Image |
| ) |
| retries := 5 |
| for j := 1; j <= retries; j++ { |
| imgJSON, imgSize, err = p.session.GetRemoteImageJSON(id, endpoint) |
| if err != nil && j == retries { |
| out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil)) |
| return layersDownloaded, err |
| } else if err != nil { |
| time.Sleep(time.Duration(j) * 500 * time.Millisecond) |
| continue |
| } |
| img, err = image.NewImgJSON(imgJSON) |
| layersDownloaded = true |
| if err != nil && j == retries { |
| out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil)) |
| return layersDownloaded, fmt.Errorf("Failed to parse json: %s", err) |
| } else if err != nil { |
| time.Sleep(time.Duration(j) * 500 * time.Millisecond) |
| continue |
| } else { |
| break |
| } |
| } |
| |
| for j := 1; j <= retries; j++ { |
| // Get the layer |
| status := "Pulling fs layer" |
| if j > 1 { |
| status = fmt.Sprintf("Pulling fs layer [retries: %d]", j) |
| } |
| out.Write(p.sf.FormatProgress(stringid.TruncateID(id), status, nil)) |
| layer, err := p.session.GetRemoteImageLayer(img.ID, endpoint, int64(imgSize)) |
| if uerr, ok := err.(*url.Error); ok { |
| err = uerr.Err |
| } |
| if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries { |
| time.Sleep(time.Duration(j) * 500 * time.Millisecond) |
| continue |
| } else if err != nil { |
| out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error pulling dependent layers", nil)) |
| return layersDownloaded, err |
| } |
| layersDownloaded = true |
| defer layer.Close() |
| |
| err = p.graph.Register(img, |
| progressreader.New(progressreader.Config{ |
| In: layer, |
| Out: out, |
| Formatter: p.sf, |
| Size: imgSize, |
| NewLines: false, |
| ID: stringid.TruncateID(id), |
| Action: "Downloading", |
| })) |
| if terr, ok := err.(net.Error); ok && terr.Timeout() && j < retries { |
| time.Sleep(time.Duration(j) * 500 * time.Millisecond) |
| continue |
| } else if err != nil { |
| out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Error downloading dependent layers", nil)) |
| return layersDownloaded, err |
| } else { |
| break |
| } |
| } |
| } |
| out.Write(p.sf.FormatProgress(stringid.TruncateID(id), "Download complete", nil)) |
| } |
| return layersDownloaded, nil |
| } |