blob: 47ad89445568c5c5db38ae5141b48b7196f37fe5 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package artifacts
import (
"compress/flate"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"strings"
"cloud.google.com/go/storage"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/retry/transient"
"google.golang.org/api/iterator"
)
// The default name of a test's output file. We only store a single file containing a
// test's combined stdout and stderr streams today. This will change in the future.
const DefaultTestOutputName = "stdout-stderr.txt"
// ErrNothingMatchedPrefix is returned when `List()` fails to find any objects
// in a bucket that match the given prefix.
type ErrNothingMatchedPrefix struct {
BucketName string
Prefix string
}
func (e *ErrNothingMatchedPrefix) Error() string {
return fmt.Sprintf("nothing matched prefix gs://%s/%s", e.BucketName, e.Prefix)
}
// Directory is an interface for interacting with a Cloud Storage "directory".
type Directory interface {
Object(string) *storage.ObjectHandle
CopyFile(ctx context.Context, src, dest string) (int64, error)
List(context.Context, string) ([]string, error)
}
// BuildDirectory represents a Fuchsia CI build's artifact directory. Refer to the
// layout in doc.go for the layout of this directory. When amending the layout, prefer
// adding convenience methods on this type to encourages all clients to create objects
// within this BuildDirectory using the same layout.
type BuildDirectory struct {
*directory
}
// NewTestOutputObject creates a new ObjectHandle to hold the output of the given test
// execution in this BuildDirectory. testName is the name of the test, envName is the
// canonical name of the test environment. Both are normalized according to
// normalizePathSegment.
func (d BuildDirectory) NewTestOutputObject(ctx context.Context, testName, envName string) *storage.ObjectHandle {
return d.cd("tests").cd(testName).cd(envName).Object(DefaultTestOutputName)
}
// directory is a handle to a Cloud Storage "directory". It provides a minimal
// filesystem-like interface for a Cloud Storage object hierarchy where "/" is used as the
// path separator. Any methods added to this struct are forward to other directory types.
type directory struct {
bucket *storage.BucketHandle
bucketName string
root string
}
// Object returns a handle to the given object within this directory. objPath is the path to
// the object relative to this directory.
func (d *directory) Object(objPath string) *storage.ObjectHandle {
objPath = path.Join(d.root, objPath)
return d.bucket.Object(objPath)
}
type gzipReader struct {
gReader io.ReadCloser
reader io.ReadCloser
}
// newGzipReader takes a reader to a gzip-compressed file and returns a
// gzipReader to read the uncompressed data.
func newGzipReader(r io.ReadCloser) (*gzipReader, error) {
gr, err := gzip.NewReader(r)
if err != nil {
return nil, fmt.Errorf("failed to get gzip reader: %w", err)
}
return &gzipReader{gr, r}, nil
}
func (r *gzipReader) Read(p []byte) (int, error) {
return r.gReader.Read(p)
}
func (r *gzipReader) Close() error {
err := r.gReader.Close()
if err == nil {
err = r.reader.Close()
} else {
r.reader.Close()
}
return err
}
func getUncompressedReader(ctx context.Context, obj *storage.ObjectHandle, objAttrs *storage.ObjectAttrs) (io.ReadCloser, error) {
if objAttrs.ContentEncoding != "gzip" {
return obj.NewReader(ctx)
}
r, err := obj.ReadCompressed(true).NewReader(ctx)
if err != nil {
return nil, fmt.Errorf("failed to read %q from GCS: %w", obj.ObjectName(), err)
}
if r.Attrs.ContentEncoding != "gzip" {
return nil, fmt.Errorf("content-encoding expected: gzip, actual: %s", r.Attrs.ContentEncoding)
}
return newGzipReader(r)
}
// CopyFile copies the object from src to dst, creating all the parent directories
// of dest if they don't exist. It returns the (compressed, if applicable) size of
// the object even if it fails to download it.
func (d *directory) CopyFile(ctx context.Context, src, dest string) (int64, error) {
var n int64
err := retry.Retry(ctx, transient.Only(retry.Default), func() error {
var err error
n, err = d.copyOnce(ctx, src, dest)
if isTransientDownloadError(err) {
return transient.Tag.Apply(err)
}
return err
}, nil)
return n, err
}
func (d *directory) copyOnce(ctx context.Context, src, dest string) (int64, error) {
obj := d.Object(src)
objAttrs, err := obj.Attrs(ctx)
if err != nil {
return 0, fmt.Errorf("failed to get attrs for %q from GCS: %w", obj.ObjectName(), err)
}
size := objAttrs.Size
input, err := getUncompressedReader(ctx, obj, objAttrs)
if err != nil {
return size, err
}
defer input.Close()
if err := os.MkdirAll(filepath.Dir(dest), os.ModePerm); err != nil {
return size, err
}
output, err := os.Create(dest)
if err != nil {
return size, err
}
defer output.Close()
_, err = io.Copy(output, input)
return size, err
}
// isTransientDownloadError returns whether an error corresponds to a GCS
// download failure mode that's known to be transient.
func isTransientDownloadError(err error) bool {
if err == nil {
return false
}
if strings.Contains(err.Error(), "storage: bad CRC on read") || errors.Is(err, gzip.ErrChecksum) {
// Checksum failure likely indicates transient corruption during download.
return true
}
var flateErr flate.CorruptInputError
if errors.As(err, &flateErr) {
// Ditto, likely indicates transient corruption.
return true
}
return false
}
// List lists all of the objects in this directory with the given prefix. If prefix == "",
// it will list everything in this directory.
func (d *directory) List(ctx context.Context, prefix string) ([]string, error) {
prefix = path.Join(d.root, prefix)
iter := d.bucket.Objects(ctx, &storage.Query{
Prefix: prefix,
})
var items []string
for {
attrs, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
items = append(items, strings.TrimPrefix(attrs.Name, d.root+"/"))
}
if len(items) == 0 {
return nil, &ErrNothingMatchedPrefix{
BucketName: d.bucketName,
Prefix: prefix,
}
}
return items, nil
}
// CD returns a handle to some child directory of this directory. directChild should be a
// direct child of the current directory, not a grandchild or any entry deeper in the
// tree. The child's name is normalized according to normalizePathSegment, so using a
// nested path may result in an unexpected file tree.
func (d *directory) cd(directChild string) *directory {
return &directory{
bucket: d.bucket,
root: fmt.Sprintf("%s/%s", d.root, normalizePathSegment(directChild)),
}
}