| package plugin |
| |
| import ( |
| "fmt" |
| "io" |
| "io/ioutil" |
| "os" |
| "path/filepath" |
| |
| "github.com/docker/docker/distribution/xfer" |
| "github.com/docker/docker/image" |
| "github.com/docker/docker/layer" |
| "github.com/docker/docker/pkg/archive" |
| "github.com/docker/docker/pkg/chrootarchive" |
| "github.com/docker/docker/pkg/progress" |
| "github.com/opencontainers/go-digest" |
| "github.com/pkg/errors" |
| "github.com/sirupsen/logrus" |
| "golang.org/x/net/context" |
| ) |
| |
| type blobstore interface { |
| New() (WriteCommitCloser, error) |
| Get(dgst digest.Digest) (io.ReadCloser, error) |
| Size(dgst digest.Digest) (int64, error) |
| } |
| |
| type basicBlobStore struct { |
| path string |
| } |
| |
| func newBasicBlobStore(p string) (*basicBlobStore, error) { |
| tmpdir := filepath.Join(p, "tmp") |
| if err := os.MkdirAll(tmpdir, 0700); err != nil { |
| return nil, errors.Wrapf(err, "failed to mkdir %v", p) |
| } |
| return &basicBlobStore{path: p}, nil |
| } |
| |
| func (b *basicBlobStore) New() (WriteCommitCloser, error) { |
| f, err := ioutil.TempFile(filepath.Join(b.path, "tmp"), ".insertion") |
| if err != nil { |
| return nil, errors.Wrap(err, "failed to create temp file") |
| } |
| return newInsertion(f), nil |
| } |
| |
| func (b *basicBlobStore) Get(dgst digest.Digest) (io.ReadCloser, error) { |
| return os.Open(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex())) |
| } |
| |
| func (b *basicBlobStore) Size(dgst digest.Digest) (int64, error) { |
| stat, err := os.Stat(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex())) |
| if err != nil { |
| return 0, err |
| } |
| return stat.Size(), nil |
| } |
| |
| func (b *basicBlobStore) gc(whitelist map[digest.Digest]struct{}) { |
| for _, alg := range []string{string(digest.Canonical)} { |
| items, err := ioutil.ReadDir(filepath.Join(b.path, alg)) |
| if err != nil { |
| continue |
| } |
| for _, fi := range items { |
| if _, exists := whitelist[digest.Digest(alg+":"+fi.Name())]; !exists { |
| p := filepath.Join(b.path, alg, fi.Name()) |
| err := os.RemoveAll(p) |
| logrus.Debugf("cleaned up blob %v: %v", p, err) |
| } |
| } |
| } |
| |
| } |
| |
| // WriteCommitCloser defines object that can be committed to blobstore. |
| type WriteCommitCloser interface { |
| io.WriteCloser |
| Commit() (digest.Digest, error) |
| } |
| |
| type insertion struct { |
| io.Writer |
| f *os.File |
| digester digest.Digester |
| closed bool |
| } |
| |
| func newInsertion(tempFile *os.File) *insertion { |
| digester := digest.Canonical.Digester() |
| return &insertion{f: tempFile, digester: digester, Writer: io.MultiWriter(tempFile, digester.Hash())} |
| } |
| |
| func (i *insertion) Commit() (digest.Digest, error) { |
| p := i.f.Name() |
| d := filepath.Join(filepath.Join(p, "../../")) |
| i.f.Sync() |
| defer os.RemoveAll(p) |
| if err := i.f.Close(); err != nil { |
| return "", err |
| } |
| i.closed = true |
| dgst := i.digester.Digest() |
| if err := os.MkdirAll(filepath.Join(d, string(dgst.Algorithm())), 0700); err != nil { |
| return "", errors.Wrapf(err, "failed to mkdir %v", d) |
| } |
| if err := os.Rename(p, filepath.Join(d, string(dgst.Algorithm()), dgst.Hex())); err != nil { |
| return "", errors.Wrapf(err, "failed to rename %v", p) |
| } |
| return dgst, nil |
| } |
| |
| func (i *insertion) Close() error { |
| if i.closed { |
| return nil |
| } |
| defer os.RemoveAll(i.f.Name()) |
| return i.f.Close() |
| } |
| |
| type downloadManager struct { |
| blobStore blobstore |
| tmpDir string |
| blobs []digest.Digest |
| configDigest digest.Digest |
| } |
| |
| func (dm *downloadManager) Download(ctx context.Context, initialRootFS image.RootFS, os layer.OS, layers []xfer.DownloadDescriptor, progressOutput progress.Output) (image.RootFS, func(), error) { |
| // TODO @jhowardmsft LCOW: May need revisiting. |
| for _, l := range layers { |
| b, err := dm.blobStore.New() |
| if err != nil { |
| return initialRootFS, nil, err |
| } |
| defer b.Close() |
| rc, _, err := l.Download(ctx, progressOutput) |
| if err != nil { |
| return initialRootFS, nil, errors.Wrap(err, "failed to download") |
| } |
| defer rc.Close() |
| r := io.TeeReader(rc, b) |
| inflatedLayerData, err := archive.DecompressStream(r) |
| if err != nil { |
| return initialRootFS, nil, err |
| } |
| digester := digest.Canonical.Digester() |
| if _, err := chrootarchive.ApplyLayer(dm.tmpDir, io.TeeReader(inflatedLayerData, digester.Hash())); err != nil { |
| return initialRootFS, nil, err |
| } |
| initialRootFS.Append(layer.DiffID(digester.Digest())) |
| d, err := b.Commit() |
| if err != nil { |
| return initialRootFS, nil, err |
| } |
| dm.blobs = append(dm.blobs, d) |
| } |
| return initialRootFS, nil, nil |
| } |
| |
| func (dm *downloadManager) Put(dt []byte) (digest.Digest, error) { |
| b, err := dm.blobStore.New() |
| if err != nil { |
| return "", err |
| } |
| defer b.Close() |
| n, err := b.Write(dt) |
| if err != nil { |
| return "", err |
| } |
| if n != len(dt) { |
| return "", io.ErrShortWrite |
| } |
| d, err := b.Commit() |
| dm.configDigest = d |
| return d, err |
| } |
| |
| func (dm *downloadManager) Get(d digest.Digest) ([]byte, error) { |
| return nil, fmt.Errorf("digest not found") |
| } |
| func (dm *downloadManager) RootFSAndOSFromConfig(c []byte) (*image.RootFS, layer.OS, error) { |
| return configToRootFS(c) |
| } |