| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| package artifacts |
| |
| import ( |
| "compress/flate" |
| "compress/gzip" |
| "context" |
| "errors" |
| "fmt" |
| "io" |
| "os" |
| "path" |
| "path/filepath" |
| "strings" |
| |
| "cloud.google.com/go/storage" |
| "go.chromium.org/luci/common/retry" |
| "go.chromium.org/luci/common/retry/transient" |
| "google.golang.org/api/iterator" |
| ) |
| |
| // The default name of a test's output file. We only store a single file containing a |
| // test's combined stdout and stderr streams today. This will change in the future. |
| const DefaultTestOutputName = "stdout-stderr.txt" |
| |
| // ErrNothingMatchedPrefix is returned when `List()` fails to find any objects |
| // in a bucket that match the given prefix. |
| type ErrNothingMatchedPrefix struct { |
| BucketName string |
| Prefix string |
| } |
| |
| func (e *ErrNothingMatchedPrefix) Error() string { |
| return fmt.Sprintf("nothing matched prefix gs://%s/%s", e.BucketName, e.Prefix) |
| } |
| |
| // Directory is an interface for interacting with a Cloud Storage "directory". |
| type Directory interface { |
| Object(string) *storage.ObjectHandle |
| CopyFile(ctx context.Context, src, dest string) (int64, error) |
| List(context.Context, string) ([]string, error) |
| } |
| |
| // BuildDirectory represents a Fuchsia CI build's artifact directory. Refer to the |
| // layout in doc.go for the layout of this directory. When amending the layout, prefer |
| // adding convenience methods on this type to encourages all clients to create objects |
| // within this BuildDirectory using the same layout. |
| type BuildDirectory struct { |
| *directory |
| } |
| |
| // NewTestOutputObject creates a new ObjectHandle to hold the output of the given test |
| // execution in this BuildDirectory. testName is the name of the test, envName is the |
| // canonical name of the test environment. Both are normalized according to |
| // normalizePathSegment. |
| func (d BuildDirectory) NewTestOutputObject(ctx context.Context, testName, envName string) *storage.ObjectHandle { |
| return d.cd("tests").cd(testName).cd(envName).Object(DefaultTestOutputName) |
| } |
| |
| // directory is a handle to a Cloud Storage "directory". It provides a minimal |
| // filesystem-like interface for a Cloud Storage object hierarchy where "/" is used as the |
| // path separator. Any methods added to this struct are forward to other directory types. |
| type directory struct { |
| bucket *storage.BucketHandle |
| bucketName string |
| root string |
| } |
| |
| // Object returns a handle to the given object within this directory. objPath is the path to |
| // the object relative to this directory. |
| func (d *directory) Object(objPath string) *storage.ObjectHandle { |
| objPath = path.Join(d.root, objPath) |
| return d.bucket.Object(objPath) |
| } |
| |
| type gzipReader struct { |
| gReader io.ReadCloser |
| reader io.ReadCloser |
| } |
| |
| // newGzipReader takes a reader to a gzip-compressed file and returns a |
| // gzipReader to read the uncompressed data. |
| func newGzipReader(r io.ReadCloser) (*gzipReader, error) { |
| gr, err := gzip.NewReader(r) |
| if err != nil { |
| return nil, fmt.Errorf("failed to get gzip reader: %w", err) |
| } |
| return &gzipReader{gr, r}, nil |
| } |
| |
| func (r *gzipReader) Read(p []byte) (int, error) { |
| return r.gReader.Read(p) |
| } |
| |
| func (r *gzipReader) Close() error { |
| err := r.gReader.Close() |
| if err == nil { |
| err = r.reader.Close() |
| } else { |
| r.reader.Close() |
| } |
| return err |
| } |
| |
| func getUncompressedReader(ctx context.Context, obj *storage.ObjectHandle, objAttrs *storage.ObjectAttrs) (io.ReadCloser, error) { |
| if objAttrs.ContentEncoding != "gzip" { |
| return obj.NewReader(ctx) |
| } |
| r, err := obj.ReadCompressed(true).NewReader(ctx) |
| if err != nil { |
| return nil, fmt.Errorf("failed to read %q from GCS: %w", obj.ObjectName(), err) |
| } |
| if r.Attrs.ContentEncoding != "gzip" { |
| return nil, fmt.Errorf("content-encoding expected: gzip, actual: %s", r.Attrs.ContentEncoding) |
| } |
| |
| return newGzipReader(r) |
| } |
| |
| // CopyFile copies the object from src to dst, creating all the parent directories |
| // of dest if they don't exist. It returns the (compressed, if applicable) size of |
| // the object even if it fails to download it. |
| func (d *directory) CopyFile(ctx context.Context, src, dest string) (int64, error) { |
| var n int64 |
| err := retry.Retry(ctx, transient.Only(retry.Default), func() error { |
| var err error |
| n, err = d.copyOnce(ctx, src, dest) |
| if isTransientDownloadError(err) { |
| return transient.Tag.Apply(err) |
| } |
| return err |
| }, nil) |
| return n, err |
| } |
| |
| func (d *directory) copyOnce(ctx context.Context, src, dest string) (int64, error) { |
| obj := d.Object(src) |
| objAttrs, err := obj.Attrs(ctx) |
| if err != nil { |
| return 0, fmt.Errorf("failed to get attrs for %q from GCS: %w", obj.ObjectName(), err) |
| } |
| size := objAttrs.Size |
| input, err := getUncompressedReader(ctx, obj, objAttrs) |
| if err != nil { |
| return size, err |
| } |
| defer input.Close() |
| |
| if err := os.MkdirAll(filepath.Dir(dest), os.ModePerm); err != nil { |
| return size, err |
| } |
| output, err := os.Create(dest) |
| if err != nil { |
| return size, err |
| } |
| defer output.Close() |
| |
| _, err = io.Copy(output, input) |
| return size, err |
| } |
| |
| // isTransientDownloadError returns whether an error corresponds to a GCS |
| // download failure mode that's known to be transient. |
| func isTransientDownloadError(err error) bool { |
| if err == nil { |
| return false |
| } |
| if strings.Contains(err.Error(), "storage: bad CRC on read") || errors.Is(err, gzip.ErrChecksum) { |
| // Checksum failure likely indicates transient corruption during download. |
| return true |
| } |
| var flateErr flate.CorruptInputError |
| if errors.As(err, &flateErr) { |
| // Ditto, likely indicates transient corruption. |
| return true |
| } |
| return false |
| } |
| |
| // List lists all of the objects in this directory with the given prefix. If prefix == "", |
| // it will list everything in this directory. |
| func (d *directory) List(ctx context.Context, prefix string) ([]string, error) { |
| prefix = path.Join(d.root, prefix) |
| iter := d.bucket.Objects(ctx, &storage.Query{ |
| Prefix: prefix, |
| }) |
| |
| var items []string |
| for { |
| attrs, err := iter.Next() |
| if err == iterator.Done { |
| break |
| } |
| if err != nil { |
| return nil, err |
| } |
| items = append(items, strings.TrimPrefix(attrs.Name, d.root+"/")) |
| } |
| |
| if len(items) == 0 { |
| return nil, &ErrNothingMatchedPrefix{ |
| BucketName: d.bucketName, |
| Prefix: prefix, |
| } |
| } |
| return items, nil |
| } |
| |
| // CD returns a handle to some child directory of this directory. directChild should be a |
| // direct child of the current directory, not a grandchild or any entry deeper in the |
| // tree. The child's name is normalized according to normalizePathSegment, so using a |
| // nested path may result in an unexpected file tree. |
| func (d *directory) cd(directChild string) *directory { |
| return &directory{ |
| bucket: d.bucket, |
| root: fmt.Sprintf("%s/%s", d.root, normalizePathSegment(directChild)), |
| } |
| } |