blob: a1bcf52725d762fcbea89acbaeba482e03b00733 [file] [log] [blame]
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package main
import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
"crypto/ed25519"
"crypto/md5"
"encoding/json"
"errors"
"fmt"
"hash"
"io"
"io/ioutil"
"os"
"os/signal"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"
"cloud.google.com/go/storage"
"github.com/maruel/subcommands"
"go.chromium.org/luci/auth"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/logging/gologger"
"go.chromium.org/luci/common/retry/transient"
"google.golang.org/api/googleapi"
"go.fuchsia.dev/infra/cmd/gcs-util/lib"
"go.fuchsia.dev/infra/cmd/gcs-util/types"
"google.golang.org/api/option"
)
func cmdUp(authOpts auth.Options) *subcommands.Command {
return &subcommands.Command{
UsageLine: "up -bucket <bucket> -namespace <namespace> -manifest-path <manifest-path>",
ShortDesc: "Upload files from an input manifest to Google Cloud Storage.",
LongDesc: "Upload files from an input manifest to Google Cloud Storage.",
CommandRun: func() subcommands.CommandRun {
c := &upCmd{}
c.Init(authOpts)
return c
},
}
}
func isTransientError(err error) bool {
var apiErr *googleapi.Error
return transient.Tag.In(err) || (errors.As(err, &apiErr) && apiErr.Code >= 500)
}
type upCmd struct {
commonFlags
bucket string
namespace string
privateKeyPath string
manifestPath string
j int
logLevel logging.Level
}
func (c *upCmd) Init(defaultAuthOpts auth.Options) {
c.commonFlags.Init(defaultAuthOpts)
c.Flags.StringVar(&c.bucket, "bucket", "", "Bucket to upload to.")
c.Flags.StringVar(&c.namespace, "namespace", "", "Namespace of non-deduplicated uploads relative to the root of the bucket.")
c.Flags.StringVar(&c.privateKeyPath, "pkey", "", "The path to an ED25519 private key encoded in the PKCS8 PEM format.\n"+
"This can, for example, be generated by \"openssl genpkey -algorithm ed25519\".\n"+
"If set, all images and build APIs will be signed and uploaded with their signatures\n"+
"in GCS metadata, and the corresponding public key will be uploaded as well.")
c.Flags.StringVar(&c.manifestPath, "manifest-path", "", "Upload manifest.")
c.Flags.IntVar(&c.j, "j", 32, "Maximum number of concurrent uploading processes.")
c.Flags.Var(&c.logLevel, "log-level", "Logging level. Can be debug, info, warning, or error.")
}
func (c *upCmd) parseArgs() error {
if err := c.commonFlags.Parse(); err != nil {
return err
}
if c.bucket == "" {
return errors.New("-bucket is required")
}
if c.namespace == "" {
return errors.New("-namespace is required")
}
if c.manifestPath == "" {
return errors.New("-manifest-path is required")
}
return nil
}
func (c *upCmd) Run(a subcommands.Application, _ []string, _ subcommands.Env) int {
if err := c.parseArgs(); err != nil {
fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err)
return 1
}
if err := c.main(); err != nil {
fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err)
if isTransientError(err) || errors.Is(err, context.DeadlineExceeded) {
return exitTransientError
}
return 1
}
return 0
}
func (c *upCmd) main() error {
ctx := context.Background()
ctx, cancel := signal.NotifyContext(ctx, syscall.SIGTERM)
defer cancel()
ctx = logging.SetLevel(ctx, c.logLevel)
ctx = gologger.StdConfig.Use(ctx)
jsonInput, err := ioutil.ReadFile(c.manifestPath)
if err != nil {
return err
}
var manifest []types.Upload
if err := json.Unmarshal(jsonInput, &manifest); err != nil {
return err
}
// Flatten any directories in the manifest to files.
files := []types.Upload{}
for _, upload := range manifest {
if len(upload.Source) == 0 {
files = append(files, upload)
continue
}
fileInfo, err := os.Stat(upload.Source)
if err != nil {
return err
}
if !fileInfo.IsDir() {
files = append(files, upload)
} else {
contents, err := dirToFiles(ctx, upload)
if err != nil {
return err
}
files = append(files, contents...)
}
}
pkey, err := lib.PrivateKey(c.privateKeyPath)
if err != nil {
return fmt.Errorf("failed to get private key: %w", err)
}
if pkey != nil {
publicKey, err := lib.PublicKeyUpload(pkey.Public().(ed25519.PublicKey))
if err != nil {
return err
}
files = append(files, *publicKey)
files, err = lib.Sign(files, pkey)
if err != nil {
return err
}
}
authenticator := auth.NewAuthenticator(ctx, auth.OptionalLogin, c.parsedAuthOpts)
tokenSource, err := authenticator.TokenSource()
if err != nil {
if err == auth.ErrLoginRequired {
fmt.Fprintf(os.Stderr, "You need to login first by running:\n")
fmt.Fprintf(os.Stderr, " luci-auth login -scopes %q\n", strings.Join(c.parsedAuthOpts.Scopes, " "))
}
return err
}
sink, err := newCloudSink(ctx, c.bucket, option.WithTokenSource(tokenSource))
if err != nil {
return err
}
defer sink.client.Close()
return uploadFiles(ctx, files, sink, c.j, c.namespace)
}
// DataSink is an abstract data sink, providing a mockable interface to
// cloudSink, the GCS-backed implementation below.
type dataSink interface {
// ObjectExistsAt returns whether an object of that name exists within the sink.
objectExistsAt(ctx context.Context, name string) (bool, *storage.ObjectAttrs, error)
// Write writes the content of a reader to a sink object at the given name.
// If an object at that name does not exists, it will be created; else it
// will be overwritten.
write(ctx context.Context, upload *types.Upload) error
}
// CloudSink is a GCS-backed data sink.
type cloudSink struct {
client *storage.Client
bucket *storage.BucketHandle
}
func newCloudSink(ctx context.Context, bucket string, opts ...option.ClientOption) (*cloudSink, error) {
client, err := storage.NewClient(ctx, opts...)
if err != nil {
return nil, err
}
return &cloudSink{
client: client,
bucket: client.Bucket(bucket),
}, nil
}
func (s *cloudSink) objectExistsAt(ctx context.Context, name string) (bool, *storage.ObjectAttrs, error) {
attrs, err := lib.ObjectAttrs(ctx, s.bucket.Object(name))
if err != nil {
if errors.Is(err, storage.ErrObjectNotExist) {
return false, nil, nil
}
return false, nil, err
}
// Check if MD5 is not set, mark this as a miss, then write() function will
// handle the race.
return len(attrs.MD5) != 0, attrs, nil
}
// hasher is a io.Writer that calculates the MD5.
type hasher struct {
h hash.Hash
w io.Writer
}
func (h *hasher) Write(p []byte) (int, error) {
n, err := h.w.Write(p)
_, _ = h.h.Write(p[:n])
return n, err
}
func (s *cloudSink) write(ctx context.Context, upload *types.Upload) error {
var reader io.Reader
if upload.Source != "" {
f, err := os.Open(upload.Source)
if err != nil {
return err
}
defer f.Close()
reader = f
} else {
reader = bytes.NewBuffer(upload.Contents)
}
obj := s.bucket.Object(upload.Destination)
// Set timeouts to fail fast on unresponsive connections.
tctx, cancel := context.WithTimeout(ctx, perFileUploadTimeout)
defer cancel()
sw := obj.If(storage.Conditions{DoesNotExist: true}).NewWriter(tctx)
sw.ChunkSize = chunkSize
sw.ContentType = "application/octet-stream"
if upload.Compress {
sw.ContentEncoding = "gzip"
}
if upload.Metadata != nil {
sw.Metadata = upload.Metadata
}
// The CustomTime needs to be set to work with the lifecycle condition
// set on the GCS bucket.
sw.CustomTime = time.Now()
// We optionally compress on the fly, and calculate the MD5 on the
// compressed data.
// Writes happen asynchronously, and so a nil may be returned while the write
// goes on to fail. It is recommended in
// https://godoc.org/cloud.google.com/go/storage#Writer.Write
// to return the value of Close() to detect the success of the write.
// Note that a gzip compressor would need to be closed before the storage
// writer that it wraps is.
h := &hasher{md5.New(), sw}
var writeErr, tarErr, zipErr error
if upload.Compress {
gzw := gzip.NewWriter(h)
if upload.TarHeader != nil {
tw := tar.NewWriter(gzw)
writeErr = tw.WriteHeader(upload.TarHeader)
if writeErr == nil {
_, writeErr = io.Copy(tw, reader)
}
tarErr = tw.Close()
} else {
_, writeErr = io.Copy(gzw, reader)
}
zipErr = gzw.Close()
} else {
_, writeErr = io.Copy(h, reader)
}
closeErr := sw.Close()
// Keep the first error we encountered - and vet it for 'permissable' GCS
// error states.
// Note: consider an errorsmisc.FirstNonNil() helper if see this logic again.
err := writeErr
if err == nil {
err = tarErr
}
if err == nil {
err = zipErr
}
if err == nil {
err = closeErr
}
if err = checkGCSErr(ctx, err, upload.Destination); err != nil {
return err
}
// Now confirm that the MD5 matches upstream, just in case. If the file was
// uploaded by another client (a race condition), loop until the MD5 is set.
// This guarantees that the file is properly uploaded before this function
// quits.
d := h.h.Sum(nil)
t := time.Second
const max = 30 * time.Second
for {
attrs, err := lib.ObjectAttrs(ctx, obj)
if err != nil {
return fmt.Errorf("failed to confirm MD5 for %s due to: %w", upload.Destination, err)
}
if len(attrs.MD5) == 0 {
time.Sleep(t)
if t += t / 2; t > max {
t = max
}
logging.Debugf(ctx, "waiting for MD5 for %s", upload.Destination)
continue
}
if !bytes.Equal(attrs.MD5, d) {
return md5MismatchError{fmt.Errorf("MD5 mismatch for %s; local: %x, remote: %x", upload.Destination, d, attrs.MD5)}
}
break
}
return nil
}
// TODO(fxbug.dev/78017): Delete this type once fixed.
type md5MismatchError struct {
error
}
// checkGCSErr validates the error for a GCS upload.
//
// If the precondition of the object not existing is not met on write (i.e.,
// at the time of the write the object is there), then the server will
// respond with a 412. (See
// https://cloud.google.com/storage/docs/json_api/v1/status-codes and
// https://tools.ietf.org/html/rfc7232#section-4.2.)
// We do not report this as an error, however, as the associated object might
// have been created by another job after we checked its non-existence - and we
// wish to be resilient in the event of such a race.
func checkGCSErr(ctx context.Context, err error, name string) error {
if err == nil || err == io.EOF {
return nil
}
if strings.Contains(err.Error(), "Error 412") {
logging.Debugf(ctx, "object %q: encountered recoverable race condition during upload, already exists remotely", name)
return nil
}
return err
}
// dirToFiles returns a list of the top-level files in the dir if dir.Recursive
// is false, else it returns all files in the dir.
func dirToFiles(ctx context.Context, dir types.Upload) ([]types.Upload, error) {
var files []types.Upload
var err error
var paths []string
if dir.Recursive {
err = filepath.Walk(dir.Source, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
relPath, err := filepath.Rel(dir.Source, path)
if err != nil {
return err
}
paths = append(paths, relPath)
}
return nil
})
} else {
var entries []os.FileInfo
entries, err = ioutil.ReadDir(dir.Source)
if err == nil {
for _, fi := range entries {
if fi.IsDir() {
continue
}
paths = append(paths, fi.Name())
}
}
}
if err != nil {
return nil, err
}
for _, path := range paths {
files = append(files, types.Upload{
Source: filepath.Join(dir.Source, path),
Destination: filepath.Join(dir.Destination, path),
Deduplicate: dir.Deduplicate,
Signed: dir.Signed,
})
}
return files, nil
}
func uploadFiles(ctx context.Context, files []types.Upload, dest dataSink, j int, namespace string) error {
if j <= 0 {
return fmt.Errorf("Concurrency factor j must be a positive number")
}
uploads := make(chan types.Upload, j)
errs := make(chan error, j)
queueUploads := func() {
defer close(uploads)
for _, f := range files {
if len(f.Source) != 0 {
fileInfo, err := os.Stat(f.Source)
if err != nil {
errs <- err
return
}
mtime := strconv.FormatInt(fileInfo.ModTime().Unix(), 10)
if f.Metadata == nil {
f.Metadata = map[string]string{}
}
f.Metadata[googleReservedFileMtime] = mtime
}
uploads <- f
}
}
objsToRefreshTTL := make(chan string)
var wg sync.WaitGroup
wg.Add(j)
upload := func() {
defer wg.Done()
for upload := range uploads {
// Files which are not deduplicated are uploaded to the dedicated
// -namespace. The manifest may already set the namespace, and in
// that case, don't try to prepend it.
if !upload.Deduplicate && !strings.HasPrefix(upload.Destination, namespace) {
upload.Destination = path.Join(namespace, upload.Destination)
}
exists, attrs, err := dest.objectExistsAt(ctx, upload.Destination)
if err != nil {
errs <- err
return
}
if exists {
logging.Debugf(ctx, "object %q: already exists remotely", upload.Destination)
if !upload.Deduplicate {
errs <- fmt.Errorf("object %q: collided", upload.Destination)
return
}
// Add objects to update timestamps for that are older than
// daysSinceCustomTime.
if attrs != nil && time.Now().AddDate(0, 0, -daysSinceCustomTime).After(attrs.CustomTime) {
objsToRefreshTTL <- upload.Destination
}
continue
}
if err := uploadFile(ctx, upload, dest); err != nil {
// If deduplicated file already exists remotely but the local
// and remote md5 hashes don't match, upload our version to a
// namespaced path so it can be compared with the
// already-existent version to help with debugging.
// TODO(fxbug.dev/78017): Delete this logic once fixed.
var md5Err md5MismatchError
if errors.As(err, &md5Err) {
upload.Destination = path.Join(namespace, "md5-mismatches", upload.Destination)
if err := uploadFile(ctx, upload, dest); err != nil {
logging.Warningf(ctx, "failed to upload md5-mismatch file %q for debugging: %s", upload.Destination, err)
}
}
errs <- err
return
}
}
}
go queueUploads()
for i := 0; i < j; i++ {
go upload()
}
go func() {
wg.Wait()
close(errs)
close(objsToRefreshTTL)
}()
var objs []string
for o := range objsToRefreshTTL {
objs = append(objs, o)
}
if err := <-errs; err != nil {
return err
}
// Upload a file listing all the deduplicated files that already existed in
// the upload destination. A post-processor will use this file to update the
// CustomTime of the objects and extend their TTL.
if len(objs) > 0 {
objsToRefreshTTLUpload := types.Upload{
Contents: []byte(strings.Join(objs, "\n")),
Destination: path.Join(namespace, objsToRefreshTTLTxt),
}
return uploadFile(ctx, objsToRefreshTTLUpload, dest)
}
return nil
}
func uploadFile(ctx context.Context, upload types.Upload, dest dataSink) error {
logging.Debugf(ctx, "object %q: attempting creation", upload.Destination)
if err := lib.Retry(ctx, func() error {
err := dest.write(ctx, &upload)
if err != nil {
logging.Warningf(ctx, "error uploading %q: %s", upload.Destination, err)
}
return err
}); err != nil {
return fmt.Errorf("%s: %w", upload.Destination, err)
}
logging.Infof(ctx, "object %q: created", upload.Destination)
return nil
}