| package xfer |
| |
| import ( |
| "errors" |
| "fmt" |
| "io" |
| "time" |
| |
| "github.com/Sirupsen/logrus" |
| "github.com/docker/docker/image" |
| "github.com/docker/docker/layer" |
| "github.com/docker/docker/pkg/archive" |
| "github.com/docker/docker/pkg/ioutils" |
| "github.com/docker/docker/pkg/progress" |
| "golang.org/x/net/context" |
| ) |
| |
| const maxDownloadAttempts = 5 |
| |
| // LayerDownloadManager figures out which layers need to be downloaded, then |
| // registers and downloads those, taking into account dependencies between |
| // layers. |
| type LayerDownloadManager struct { |
| layerStore layer.Store |
| tm TransferManager |
| } |
| |
| // NewLayerDownloadManager returns a new LayerDownloadManager. |
| func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int) *LayerDownloadManager { |
| return &LayerDownloadManager{ |
| layerStore: layerStore, |
| tm: NewTransferManager(concurrencyLimit), |
| } |
| } |
| |
| type downloadTransfer struct { |
| Transfer |
| |
| layerStore layer.Store |
| layer layer.Layer |
| err error |
| } |
| |
| // result returns the layer resulting from the download, if the download |
| // and registration were successful. |
| func (d *downloadTransfer) result() (layer.Layer, error) { |
| return d.layer, d.err |
| } |
| |
| // A DownloadDescriptor references a layer that may need to be downloaded. |
| type DownloadDescriptor interface { |
| // Key returns the key used to deduplicate downloads. |
| Key() string |
| // ID returns the ID for display purposes. |
| ID() string |
| // DiffID should return the DiffID for this layer, or an error |
| // if it is unknown (for example, if it has not been downloaded |
| // before). |
| DiffID() (layer.DiffID, error) |
| // Download is called to perform the download. |
| Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) |
| // Close is called when the download manager is finished with this |
| // descriptor and will not call Download again or read from the reader |
| // that Download returned. |
| Close() |
| } |
| |
| // DownloadDescriptorWithRegistered is a DownloadDescriptor that has an |
| // additional Registered method which gets called after a downloaded layer is |
| // registered. This allows the user of the download manager to know the DiffID |
| // of each registered layer. This method is called if a cast to |
| // DownloadDescriptorWithRegistered is successful. |
| type DownloadDescriptorWithRegistered interface { |
| DownloadDescriptor |
| Registered(diffID layer.DiffID) |
| } |
| |
| // Download is a blocking function which ensures the requested layers are |
| // present in the layer store. It uses the string returned by the Key method to |
| // deduplicate downloads. If a given layer is not already known to present in |
| // the layer store, and the key is not used by an in-progress download, the |
| // Download method is called to get the layer tar data. Layers are then |
| // registered in the appropriate order. The caller must call the returned |
| // release function once it is is done with the returned RootFS object. |
| func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS image.RootFS, layers []DownloadDescriptor, progressOutput progress.Output) (image.RootFS, func(), error) { |
| var ( |
| topLayer layer.Layer |
| topDownload *downloadTransfer |
| watcher *Watcher |
| missingLayer bool |
| transferKey = "" |
| downloadsByKey = make(map[string]*downloadTransfer) |
| ) |
| |
| rootFS := initialRootFS |
| for _, descriptor := range layers { |
| key := descriptor.Key() |
| transferKey += key |
| |
| if !missingLayer { |
| missingLayer = true |
| diffID, err := descriptor.DiffID() |
| if err == nil { |
| getRootFS := rootFS |
| getRootFS.Append(diffID) |
| l, err := ldm.layerStore.Get(getRootFS.ChainID()) |
| if err == nil { |
| // Layer already exists. |
| logrus.Debugf("Layer already exists: %s", descriptor.ID()) |
| progress.Update(progressOutput, descriptor.ID(), "Already exists") |
| if topLayer != nil { |
| layer.ReleaseAndLog(ldm.layerStore, topLayer) |
| } |
| topLayer = l |
| missingLayer = false |
| rootFS.Append(diffID) |
| continue |
| } |
| } |
| } |
| |
| // Does this layer have the same data as a previous layer in |
| // the stack? If so, avoid downloading it more than once. |
| var topDownloadUncasted Transfer |
| if existingDownload, ok := downloadsByKey[key]; ok { |
| xferFunc := ldm.makeDownloadFuncFromDownload(descriptor, existingDownload, topDownload) |
| defer topDownload.Transfer.Release(watcher) |
| topDownloadUncasted, watcher = ldm.tm.Transfer(transferKey, xferFunc, progressOutput) |
| topDownload = topDownloadUncasted.(*downloadTransfer) |
| continue |
| } |
| |
| // Layer is not known to exist - download and register it. |
| progress.Update(progressOutput, descriptor.ID(), "Pulling fs layer") |
| |
| var xferFunc DoFunc |
| if topDownload != nil { |
| xferFunc = ldm.makeDownloadFunc(descriptor, "", topDownload) |
| defer topDownload.Transfer.Release(watcher) |
| } else { |
| xferFunc = ldm.makeDownloadFunc(descriptor, rootFS.ChainID(), nil) |
| } |
| topDownloadUncasted, watcher = ldm.tm.Transfer(transferKey, xferFunc, progressOutput) |
| topDownload = topDownloadUncasted.(*downloadTransfer) |
| downloadsByKey[key] = topDownload |
| } |
| |
| if topDownload == nil { |
| return rootFS, func() { |
| if topLayer != nil { |
| layer.ReleaseAndLog(ldm.layerStore, topLayer) |
| } |
| }, nil |
| } |
| |
| // Won't be using the list built up so far - will generate it |
| // from downloaded layers instead. |
| rootFS.DiffIDs = []layer.DiffID{} |
| |
| defer func() { |
| if topLayer != nil { |
| layer.ReleaseAndLog(ldm.layerStore, topLayer) |
| } |
| }() |
| |
| select { |
| case <-ctx.Done(): |
| topDownload.Transfer.Release(watcher) |
| return rootFS, func() {}, ctx.Err() |
| case <-topDownload.Done(): |
| break |
| } |
| |
| l, err := topDownload.result() |
| if err != nil { |
| topDownload.Transfer.Release(watcher) |
| return rootFS, func() {}, err |
| } |
| |
| // Must do this exactly len(layers) times, so we don't include the |
| // base layer on Windows. |
| for range layers { |
| if l == nil { |
| topDownload.Transfer.Release(watcher) |
| return rootFS, func() {}, errors.New("internal error: too few parent layers") |
| } |
| rootFS.DiffIDs = append([]layer.DiffID{l.DiffID()}, rootFS.DiffIDs...) |
| l = l.Parent() |
| } |
| return rootFS, func() { topDownload.Transfer.Release(watcher) }, err |
| } |
| |
| // makeDownloadFunc returns a function that performs the layer download and |
| // registration. If parentDownload is non-nil, it waits for that download to |
| // complete before the registration step, and registers the downloaded data |
| // on top of parentDownload's resulting layer. Otherwise, it registers the |
| // layer on top of the ChainID given by parentLayer. |
| func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, parentLayer layer.ChainID, parentDownload *downloadTransfer) DoFunc { |
| return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer { |
| d := &downloadTransfer{ |
| Transfer: NewTransfer(), |
| layerStore: ldm.layerStore, |
| } |
| |
| go func() { |
| defer func() { |
| close(progressChan) |
| }() |
| |
| progressOutput := progress.ChanOutput(progressChan) |
| |
| select { |
| case <-start: |
| default: |
| progress.Update(progressOutput, descriptor.ID(), "Waiting") |
| <-start |
| } |
| |
| if parentDownload != nil { |
| // Did the parent download already fail or get |
| // cancelled? |
| select { |
| case <-parentDownload.Done(): |
| _, err := parentDownload.result() |
| if err != nil { |
| d.err = err |
| return |
| } |
| default: |
| } |
| } |
| |
| var ( |
| downloadReader io.ReadCloser |
| size int64 |
| err error |
| retries int |
| ) |
| |
| defer descriptor.Close() |
| |
| for { |
| downloadReader, size, err = descriptor.Download(d.Transfer.Context(), progressOutput) |
| if err == nil { |
| break |
| } |
| |
| // If an error was returned because the context |
| // was cancelled, we shouldn't retry. |
| select { |
| case <-d.Transfer.Context().Done(): |
| d.err = err |
| return |
| default: |
| } |
| |
| retries++ |
| if _, isDNR := err.(DoNotRetry); isDNR || retries == maxDownloadAttempts { |
| logrus.Errorf("Download failed: %v", err) |
| d.err = err |
| return |
| } |
| |
| logrus.Errorf("Download failed, retrying: %v", err) |
| delay := retries * 5 |
| ticker := time.NewTicker(time.Second) |
| |
| selectLoop: |
| for { |
| progress.Updatef(progressOutput, descriptor.ID(), "Retrying in %d second%s", delay, (map[bool]string{true: "s"})[delay != 1]) |
| select { |
| case <-ticker.C: |
| delay-- |
| if delay == 0 { |
| ticker.Stop() |
| break selectLoop |
| } |
| case <-d.Transfer.Context().Done(): |
| ticker.Stop() |
| d.err = errors.New("download cancelled during retry delay") |
| return |
| } |
| |
| } |
| } |
| |
| close(inactive) |
| |
| if parentDownload != nil { |
| select { |
| case <-d.Transfer.Context().Done(): |
| d.err = errors.New("layer registration cancelled") |
| downloadReader.Close() |
| return |
| case <-parentDownload.Done(): |
| } |
| |
| l, err := parentDownload.result() |
| if err != nil { |
| d.err = err |
| downloadReader.Close() |
| return |
| } |
| parentLayer = l.ChainID() |
| } |
| |
| reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(d.Transfer.Context(), downloadReader), progressOutput, size, descriptor.ID(), "Extracting") |
| defer reader.Close() |
| |
| inflatedLayerData, err := archive.DecompressStream(reader) |
| if err != nil { |
| d.err = fmt.Errorf("could not get decompression stream: %v", err) |
| return |
| } |
| |
| d.layer, err = d.layerStore.Register(inflatedLayerData, parentLayer) |
| if err != nil { |
| select { |
| case <-d.Transfer.Context().Done(): |
| d.err = errors.New("layer registration cancelled") |
| default: |
| d.err = fmt.Errorf("failed to register layer: %v", err) |
| } |
| return |
| } |
| |
| progress.Update(progressOutput, descriptor.ID(), "Pull complete") |
| withRegistered, hasRegistered := descriptor.(DownloadDescriptorWithRegistered) |
| if hasRegistered { |
| withRegistered.Registered(d.layer.DiffID()) |
| } |
| |
| // Doesn't actually need to be its own goroutine, but |
| // done like this so we can defer close(c). |
| go func() { |
| <-d.Transfer.Released() |
| if d.layer != nil { |
| layer.ReleaseAndLog(d.layerStore, d.layer) |
| } |
| }() |
| }() |
| |
| return d |
| } |
| } |
| |
| // makeDownloadFuncFromDownload returns a function that performs the layer |
| // registration when the layer data is coming from an existing download. It |
| // waits for sourceDownload and parentDownload to complete, and then |
| // reregisters the data from sourceDownload's top layer on top of |
| // parentDownload. This function does not log progress output because it would |
| // interfere with the progress reporting for sourceDownload, which has the same |
| // Key. |
| func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor DownloadDescriptor, sourceDownload *downloadTransfer, parentDownload *downloadTransfer) DoFunc { |
| return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer { |
| d := &downloadTransfer{ |
| Transfer: NewTransfer(), |
| layerStore: ldm.layerStore, |
| } |
| |
| go func() { |
| defer func() { |
| close(progressChan) |
| }() |
| |
| <-start |
| |
| close(inactive) |
| |
| select { |
| case <-d.Transfer.Context().Done(): |
| d.err = errors.New("layer registration cancelled") |
| return |
| case <-parentDownload.Done(): |
| } |
| |
| l, err := parentDownload.result() |
| if err != nil { |
| d.err = err |
| return |
| } |
| parentLayer := l.ChainID() |
| |
| // sourceDownload should have already finished if |
| // parentDownload finished, but wait for it explicitly |
| // to be sure. |
| select { |
| case <-d.Transfer.Context().Done(): |
| d.err = errors.New("layer registration cancelled") |
| return |
| case <-sourceDownload.Done(): |
| } |
| |
| l, err = sourceDownload.result() |
| if err != nil { |
| d.err = err |
| return |
| } |
| |
| layerReader, err := l.TarStream() |
| if err != nil { |
| d.err = err |
| return |
| } |
| defer layerReader.Close() |
| |
| d.layer, err = d.layerStore.Register(layerReader, parentLayer) |
| if err != nil { |
| d.err = fmt.Errorf("failed to register layer: %v", err) |
| return |
| } |
| |
| withRegistered, hasRegistered := descriptor.(DownloadDescriptorWithRegistered) |
| if hasRegistered { |
| withRegistered.Registered(d.layer.DiffID()) |
| } |
| |
| // Doesn't actually need to be its own goroutine, but |
| // done like this so we can defer close(c). |
| go func() { |
| <-d.Transfer.Released() |
| if d.layer != nil { |
| layer.ReleaseAndLog(d.layerStore, d.layer) |
| } |
| }() |
| }() |
| |
| return d |
| } |
| } |