| /* |
| Copyright The containerd Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| /* |
| Copyright 2019 The Go Authors. All rights reserved. |
| Use of this source code is governed by a BSD-style |
| license that can be found in the LICENSE file. |
| */ |
| |
| package estargz |
| |
| import ( |
| "archive/tar" |
| "bytes" |
| "compress/gzip" |
| "context" |
| "errors" |
| "fmt" |
| "io" |
| "os" |
| "path" |
| "runtime" |
| "strings" |
| "sync" |
| |
| "github.com/containerd/stargz-snapshotter/estargz/errorutil" |
| "github.com/klauspost/compress/zstd" |
| digest "github.com/opencontainers/go-digest" |
| "golang.org/x/sync/errgroup" |
| ) |
| |
| type options struct { |
| chunkSize int |
| compressionLevel int |
| prioritizedFiles []string |
| missedPrioritizedFiles *[]string |
| compression Compression |
| ctx context.Context |
| minChunkSize int |
| } |
| |
| type Option func(o *options) error |
| |
| // WithChunkSize option specifies the chunk size of eStargz blob to build. |
| func WithChunkSize(chunkSize int) Option { |
| return func(o *options) error { |
| o.chunkSize = chunkSize |
| return nil |
| } |
| } |
| |
| // WithCompressionLevel option specifies the gzip compression level. |
| // The default is gzip.BestCompression. |
| // This option will be ignored if WithCompression option is used. |
| // See also: https://godoc.org/compress/gzip#pkg-constants |
| func WithCompressionLevel(level int) Option { |
| return func(o *options) error { |
| o.compressionLevel = level |
| return nil |
| } |
| } |
| |
| // WithPrioritizedFiles option specifies the list of prioritized files. |
| // These files must be complete paths that are absolute or relative to "/" |
| // For example, all of "foo/bar", "/foo/bar", "./foo/bar" and "../foo/bar" |
| // are treated as "/foo/bar". |
| func WithPrioritizedFiles(files []string) Option { |
| return func(o *options) error { |
| o.prioritizedFiles = files |
| return nil |
| } |
| } |
| |
| // WithAllowPrioritizeNotFound makes Build continue the execution even if some |
| // of prioritized files specified by WithPrioritizedFiles option aren't found |
| // in the input tar. Instead, this records all missed file names to the passed |
| // slice. |
| func WithAllowPrioritizeNotFound(missedFiles *[]string) Option { |
| return func(o *options) error { |
| if missedFiles == nil { |
| return fmt.Errorf("WithAllowPrioritizeNotFound: slice must be passed") |
| } |
| o.missedPrioritizedFiles = missedFiles |
| return nil |
| } |
| } |
| |
| // WithCompression specifies compression algorithm to be used. |
| // Default is gzip. |
| func WithCompression(compression Compression) Option { |
| return func(o *options) error { |
| o.compression = compression |
| return nil |
| } |
| } |
| |
| // WithContext specifies a context that can be used for clean canceleration. |
| func WithContext(ctx context.Context) Option { |
| return func(o *options) error { |
| o.ctx = ctx |
| return nil |
| } |
| } |
| |
| // WithMinChunkSize option specifies the minimal number of bytes of data |
| // must be written in one gzip stream. |
| // By increasing this number, one gzip stream can contain multiple files |
| // and it hopefully leads to smaller result blob. |
| // NOTE: This adds a TOC property that old reader doesn't understand. |
| func WithMinChunkSize(minChunkSize int) Option { |
| return func(o *options) error { |
| o.minChunkSize = minChunkSize |
| return nil |
| } |
| } |
| |
| // Blob is an eStargz blob. |
| type Blob struct { |
| io.ReadCloser |
| diffID digest.Digester |
| tocDigest digest.Digest |
| } |
| |
| // DiffID returns the digest of uncompressed blob. |
| // It is only valid to call DiffID after Close. |
| func (b *Blob) DiffID() digest.Digest { |
| return b.diffID.Digest() |
| } |
| |
| // TOCDigest returns the digest of uncompressed TOC JSON. |
| func (b *Blob) TOCDigest() digest.Digest { |
| return b.tocDigest |
| } |
| |
| // Build builds an eStargz blob which is an extended version of stargz, from a blob (gzip, zstd |
| // or plain tar) passed through the argument. If there are some prioritized files are listed in |
| // the option, these files are grouped as "prioritized" and can be used for runtime optimization |
| // (e.g. prefetch). This function builds a blob in parallel, with dividing that blob into several |
| // (at least the number of runtime.GOMAXPROCS(0)) sub-blobs. |
| func Build(tarBlob *io.SectionReader, opt ...Option) (_ *Blob, rErr error) { |
| var opts options |
| opts.compressionLevel = gzip.BestCompression // BestCompression by default |
| for _, o := range opt { |
| if err := o(&opts); err != nil { |
| return nil, err |
| } |
| } |
| if opts.compression == nil { |
| opts.compression = newGzipCompressionWithLevel(opts.compressionLevel) |
| } |
| layerFiles := newTempFiles() |
| ctx := opts.ctx |
| if ctx == nil { |
| ctx = context.Background() |
| } |
| done := make(chan struct{}) |
| defer close(done) |
| go func() { |
| select { |
| case <-done: |
| // nop |
| case <-ctx.Done(): |
| layerFiles.CleanupAll() |
| } |
| }() |
| defer func() { |
| if rErr != nil { |
| if err := layerFiles.CleanupAll(); err != nil { |
| rErr = fmt.Errorf("failed to cleanup tmp files: %v: %w", err, rErr) |
| } |
| } |
| if cErr := ctx.Err(); cErr != nil { |
| rErr = fmt.Errorf("error from context %q: %w", cErr, rErr) |
| } |
| }() |
| tarBlob, err := decompressBlob(tarBlob, layerFiles) |
| if err != nil { |
| return nil, err |
| } |
| entries, err := sortEntries(tarBlob, opts.prioritizedFiles, opts.missedPrioritizedFiles) |
| if err != nil { |
| return nil, err |
| } |
| var tarParts [][]*entry |
| if opts.minChunkSize > 0 { |
| // Each entry needs to know the size of the current gzip stream so they |
| // cannot be processed in parallel. |
| tarParts = [][]*entry{entries} |
| } else { |
| tarParts = divideEntries(entries, runtime.GOMAXPROCS(0)) |
| } |
| writers := make([]*Writer, len(tarParts)) |
| payloads := make([]*os.File, len(tarParts)) |
| var mu sync.Mutex |
| var eg errgroup.Group |
| for i, parts := range tarParts { |
| i, parts := i, parts |
| // builds verifiable stargz sub-blobs |
| eg.Go(func() error { |
| esgzFile, err := layerFiles.TempFile("", "esgzdata") |
| if err != nil { |
| return err |
| } |
| sw := NewWriterWithCompressor(esgzFile, opts.compression) |
| sw.ChunkSize = opts.chunkSize |
| sw.MinChunkSize = opts.minChunkSize |
| if sw.needsOpenGzEntries == nil { |
| sw.needsOpenGzEntries = make(map[string]struct{}) |
| } |
| for _, f := range []string{PrefetchLandmark, NoPrefetchLandmark} { |
| sw.needsOpenGzEntries[f] = struct{}{} |
| } |
| if err := sw.AppendTar(readerFromEntries(parts...)); err != nil { |
| return err |
| } |
| mu.Lock() |
| writers[i] = sw |
| payloads[i] = esgzFile |
| mu.Unlock() |
| return nil |
| }) |
| } |
| if err := eg.Wait(); err != nil { |
| rErr = err |
| return nil, err |
| } |
| tocAndFooter, tocDgst, err := closeWithCombine(writers...) |
| if err != nil { |
| rErr = err |
| return nil, err |
| } |
| var rs []io.Reader |
| for _, p := range payloads { |
| fs, err := fileSectionReader(p) |
| if err != nil { |
| return nil, err |
| } |
| rs = append(rs, fs) |
| } |
| diffID := digest.Canonical.Digester() |
| pr, pw := io.Pipe() |
| go func() { |
| r, err := opts.compression.Reader(io.TeeReader(io.MultiReader(append(rs, tocAndFooter)...), pw)) |
| if err != nil { |
| pw.CloseWithError(err) |
| return |
| } |
| defer r.Close() |
| if _, err := io.Copy(diffID.Hash(), r); err != nil { |
| pw.CloseWithError(err) |
| return |
| } |
| pw.Close() |
| }() |
| return &Blob{ |
| ReadCloser: readCloser{ |
| Reader: pr, |
| closeFunc: layerFiles.CleanupAll, |
| }, |
| tocDigest: tocDgst, |
| diffID: diffID, |
| }, nil |
| } |
| |
| // closeWithCombine takes unclosed Writers and close them. This also returns the |
| // toc that combined all Writers into. |
| // Writers doesn't write TOC and footer to the underlying writers so they can be |
| // combined into a single eStargz and tocAndFooter returned by this function can |
| // be appended at the tail of that combined blob. |
| func closeWithCombine(ws ...*Writer) (tocAndFooterR io.Reader, tocDgst digest.Digest, err error) { |
| if len(ws) == 0 { |
| return nil, "", fmt.Errorf("at least one writer must be passed") |
| } |
| for _, w := range ws { |
| if w.closed { |
| return nil, "", fmt.Errorf("writer must be unclosed") |
| } |
| defer func(w *Writer) { w.closed = true }(w) |
| if err := w.closeGz(); err != nil { |
| return nil, "", err |
| } |
| if err := w.bw.Flush(); err != nil { |
| return nil, "", err |
| } |
| } |
| var ( |
| mtoc = new(JTOC) |
| currentOffset int64 |
| ) |
| mtoc.Version = ws[0].toc.Version |
| for _, w := range ws { |
| for _, e := range w.toc.Entries { |
| // Recalculate Offset of non-empty files/chunks |
| if (e.Type == "reg" && e.Size > 0) || e.Type == "chunk" { |
| e.Offset += currentOffset |
| } |
| mtoc.Entries = append(mtoc.Entries, e) |
| } |
| if w.toc.Version > mtoc.Version { |
| mtoc.Version = w.toc.Version |
| } |
| currentOffset += w.cw.n |
| } |
| |
| return tocAndFooter(ws[0].compressor, mtoc, currentOffset) |
| } |
| |
| func tocAndFooter(compressor Compressor, toc *JTOC, offset int64) (io.Reader, digest.Digest, error) { |
| buf := new(bytes.Buffer) |
| tocDigest, err := compressor.WriteTOCAndFooter(buf, offset, toc, nil) |
| if err != nil { |
| return nil, "", err |
| } |
| return buf, tocDigest, nil |
| } |
| |
| // divideEntries divides passed entries to the parts at least the number specified by the |
| // argument. |
| func divideEntries(entries []*entry, minPartsNum int) (set [][]*entry) { |
| var estimatedSize int64 |
| for _, e := range entries { |
| estimatedSize += e.header.Size |
| } |
| unitSize := estimatedSize / int64(minPartsNum) |
| var ( |
| nextEnd = unitSize |
| offset int64 |
| ) |
| set = append(set, []*entry{}) |
| for _, e := range entries { |
| set[len(set)-1] = append(set[len(set)-1], e) |
| offset += e.header.Size |
| if offset > nextEnd { |
| set = append(set, []*entry{}) |
| nextEnd += unitSize |
| } |
| } |
| return |
| } |
| |
| var errNotFound = errors.New("not found") |
| |
| // sortEntries reads the specified tar blob and returns a list of tar entries. |
| // If some of prioritized files are specified, the list starts from these |
| // files with keeping the order specified by the argument. |
| func sortEntries(in io.ReaderAt, prioritized []string, missedPrioritized *[]string) ([]*entry, error) { |
| |
| // Import tar file. |
| intar, err := importTar(in) |
| if err != nil { |
| return nil, fmt.Errorf("failed to sort: %w", err) |
| } |
| |
| // Sort the tar file respecting to the prioritized files list. |
| sorted := &tarFile{} |
| for _, l := range prioritized { |
| if err := moveRec(l, intar, sorted); err != nil { |
| if errors.Is(err, errNotFound) && missedPrioritized != nil { |
| *missedPrioritized = append(*missedPrioritized, l) |
| continue // allow not found |
| } |
| return nil, fmt.Errorf("failed to sort tar entries: %w", err) |
| } |
| } |
| if len(prioritized) == 0 { |
| sorted.add(&entry{ |
| header: &tar.Header{ |
| Name: NoPrefetchLandmark, |
| Typeflag: tar.TypeReg, |
| Size: int64(len([]byte{landmarkContents})), |
| }, |
| payload: bytes.NewReader([]byte{landmarkContents}), |
| }) |
| } else { |
| sorted.add(&entry{ |
| header: &tar.Header{ |
| Name: PrefetchLandmark, |
| Typeflag: tar.TypeReg, |
| Size: int64(len([]byte{landmarkContents})), |
| }, |
| payload: bytes.NewReader([]byte{landmarkContents}), |
| }) |
| } |
| |
| // Dump all entry and concatinate them. |
| return append(sorted.dump(), intar.dump()...), nil |
| } |
| |
| // readerFromEntries returns a reader of tar archive that contains entries passed |
| // through the arguments. |
| func readerFromEntries(entries ...*entry) io.Reader { |
| pr, pw := io.Pipe() |
| go func() { |
| tw := tar.NewWriter(pw) |
| defer tw.Close() |
| for _, entry := range entries { |
| if err := tw.WriteHeader(entry.header); err != nil { |
| pw.CloseWithError(fmt.Errorf("Failed to write tar header: %v", err)) |
| return |
| } |
| if _, err := io.Copy(tw, entry.payload); err != nil { |
| pw.CloseWithError(fmt.Errorf("Failed to write tar payload: %v", err)) |
| return |
| } |
| } |
| pw.Close() |
| }() |
| return pr |
| } |
| |
| func importTar(in io.ReaderAt) (*tarFile, error) { |
| tf := &tarFile{} |
| pw, err := newCountReadSeeker(in) |
| if err != nil { |
| return nil, fmt.Errorf("failed to make position watcher: %w", err) |
| } |
| tr := tar.NewReader(pw) |
| |
| // Walk through all nodes. |
| for { |
| // Fetch and parse next header. |
| h, err := tr.Next() |
| if err != nil { |
| if err == io.EOF { |
| break |
| } |
| return nil, fmt.Errorf("failed to parse tar file, %w", err) |
| } |
| switch cleanEntryName(h.Name) { |
| case PrefetchLandmark, NoPrefetchLandmark: |
| // Ignore existing landmark |
| continue |
| } |
| |
| // Add entry. If it already exists, replace it. |
| if _, ok := tf.get(h.Name); ok { |
| tf.remove(h.Name) |
| } |
| tf.add(&entry{ |
| header: h, |
| payload: io.NewSectionReader(in, pw.currentPos(), h.Size), |
| }) |
| } |
| |
| return tf, nil |
| } |
| |
| func moveRec(name string, in *tarFile, out *tarFile) error { |
| name = cleanEntryName(name) |
| if name == "" { // root directory. stop recursion. |
| if e, ok := in.get(name); ok { |
| // entry of the root directory exists. we should move it as well. |
| // this case will occur if tar entries are prefixed with "./", "/", etc. |
| out.add(e) |
| in.remove(name) |
| } |
| return nil |
| } |
| |
| _, okIn := in.get(name) |
| _, okOut := out.get(name) |
| if !okIn && !okOut { |
| return fmt.Errorf("file: %q: %w", name, errNotFound) |
| } |
| |
| parent, _ := path.Split(strings.TrimSuffix(name, "/")) |
| if err := moveRec(parent, in, out); err != nil { |
| return err |
| } |
| if e, ok := in.get(name); ok && e.header.Typeflag == tar.TypeLink { |
| if err := moveRec(e.header.Linkname, in, out); err != nil { |
| return err |
| } |
| } |
| if e, ok := in.get(name); ok { |
| out.add(e) |
| in.remove(name) |
| } |
| return nil |
| } |
| |
| type entry struct { |
| header *tar.Header |
| payload io.ReadSeeker |
| } |
| |
| type tarFile struct { |
| index map[string]*entry |
| stream []*entry |
| } |
| |
| func (f *tarFile) add(e *entry) { |
| if f.index == nil { |
| f.index = make(map[string]*entry) |
| } |
| f.index[cleanEntryName(e.header.Name)] = e |
| f.stream = append(f.stream, e) |
| } |
| |
| func (f *tarFile) remove(name string) { |
| name = cleanEntryName(name) |
| if f.index != nil { |
| delete(f.index, name) |
| } |
| var filtered []*entry |
| for _, e := range f.stream { |
| if cleanEntryName(e.header.Name) == name { |
| continue |
| } |
| filtered = append(filtered, e) |
| } |
| f.stream = filtered |
| } |
| |
| func (f *tarFile) get(name string) (e *entry, ok bool) { |
| if f.index == nil { |
| return nil, false |
| } |
| e, ok = f.index[cleanEntryName(name)] |
| return |
| } |
| |
| func (f *tarFile) dump() []*entry { |
| return f.stream |
| } |
| |
| type readCloser struct { |
| io.Reader |
| closeFunc func() error |
| } |
| |
| func (rc readCloser) Close() error { |
| return rc.closeFunc() |
| } |
| |
| func fileSectionReader(file *os.File) (*io.SectionReader, error) { |
| info, err := file.Stat() |
| if err != nil { |
| return nil, err |
| } |
| return io.NewSectionReader(file, 0, info.Size()), nil |
| } |
| |
| func newTempFiles() *tempFiles { |
| return &tempFiles{} |
| } |
| |
| type tempFiles struct { |
| files []*os.File |
| filesMu sync.Mutex |
| cleanupOnce sync.Once |
| } |
| |
| func (tf *tempFiles) TempFile(dir, pattern string) (*os.File, error) { |
| f, err := os.CreateTemp(dir, pattern) |
| if err != nil { |
| return nil, err |
| } |
| tf.filesMu.Lock() |
| tf.files = append(tf.files, f) |
| tf.filesMu.Unlock() |
| return f, nil |
| } |
| |
| func (tf *tempFiles) CleanupAll() (err error) { |
| tf.cleanupOnce.Do(func() { |
| err = tf.cleanupAll() |
| }) |
| return |
| } |
| |
| func (tf *tempFiles) cleanupAll() error { |
| tf.filesMu.Lock() |
| defer tf.filesMu.Unlock() |
| var allErr []error |
| for _, f := range tf.files { |
| if err := f.Close(); err != nil { |
| allErr = append(allErr, err) |
| } |
| if err := os.Remove(f.Name()); err != nil { |
| allErr = append(allErr, err) |
| } |
| } |
| tf.files = nil |
| return errorutil.Aggregate(allErr) |
| } |
| |
| func newCountReadSeeker(r io.ReaderAt) (*countReadSeeker, error) { |
| pos := int64(0) |
| return &countReadSeeker{r: r, cPos: &pos}, nil |
| } |
| |
| type countReadSeeker struct { |
| r io.ReaderAt |
| cPos *int64 |
| |
| mu sync.Mutex |
| } |
| |
| func (cr *countReadSeeker) Read(p []byte) (int, error) { |
| cr.mu.Lock() |
| defer cr.mu.Unlock() |
| |
| n, err := cr.r.ReadAt(p, *cr.cPos) |
| if err == nil { |
| *cr.cPos += int64(n) |
| } |
| return n, err |
| } |
| |
| func (cr *countReadSeeker) Seek(offset int64, whence int) (int64, error) { |
| cr.mu.Lock() |
| defer cr.mu.Unlock() |
| |
| switch whence { |
| default: |
| return 0, fmt.Errorf("Unknown whence: %v", whence) |
| case io.SeekStart: |
| case io.SeekCurrent: |
| offset += *cr.cPos |
| case io.SeekEnd: |
| return 0, fmt.Errorf("Unsupported whence: %v", whence) |
| } |
| |
| if offset < 0 { |
| return 0, fmt.Errorf("invalid offset") |
| } |
| *cr.cPos = offset |
| return offset, nil |
| } |
| |
| func (cr *countReadSeeker) currentPos() int64 { |
| cr.mu.Lock() |
| defer cr.mu.Unlock() |
| |
| return *cr.cPos |
| } |
| |
| func decompressBlob(org *io.SectionReader, tmp *tempFiles) (*io.SectionReader, error) { |
| if org.Size() < 4 { |
| return org, nil |
| } |
| src := make([]byte, 4) |
| if _, err := org.Read(src); err != nil && err != io.EOF { |
| return nil, err |
| } |
| var dR io.Reader |
| if bytes.Equal([]byte{0x1F, 0x8B, 0x08}, src[:3]) { |
| // gzip |
| dgR, err := gzip.NewReader(io.NewSectionReader(org, 0, org.Size())) |
| if err != nil { |
| return nil, err |
| } |
| defer dgR.Close() |
| dR = io.Reader(dgR) |
| } else if bytes.Equal([]byte{0x28, 0xb5, 0x2f, 0xfd}, src[:4]) { |
| // zstd |
| dzR, err := zstd.NewReader(io.NewSectionReader(org, 0, org.Size())) |
| if err != nil { |
| return nil, err |
| } |
| defer dzR.Close() |
| dR = io.Reader(dzR) |
| } else { |
| // uncompressed |
| return io.NewSectionReader(org, 0, org.Size()), nil |
| } |
| b, err := tmp.TempFile("", "uncompresseddata") |
| if err != nil { |
| return nil, err |
| } |
| if _, err := io.Copy(b, dR); err != nil { |
| return nil, err |
| } |
| return fileSectionReader(b) |
| } |