| // Copyright 2022 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| package main |
| |
| import ( |
| "archive/tar" |
| "bytes" |
| "compress/gzip" |
| "context" |
| "crypto/ed25519" |
| "crypto/md5" |
| "encoding/json" |
| "errors" |
| "fmt" |
| "hash" |
| "io" |
| "io/ioutil" |
| "os" |
| "os/signal" |
| "path" |
| "path/filepath" |
| "strconv" |
| "strings" |
| "sync" |
| "syscall" |
| "time" |
| |
| "cloud.google.com/go/storage" |
| "github.com/maruel/subcommands" |
| "go.chromium.org/luci/auth" |
| "go.chromium.org/luci/common/logging" |
| "go.chromium.org/luci/common/logging/gologger" |
| "go.chromium.org/luci/common/retry/transient" |
| "google.golang.org/api/googleapi" |
| |
| "go.fuchsia.dev/infra/cmd/gcs-util/lib" |
| "go.fuchsia.dev/infra/cmd/gcs-util/types" |
| "google.golang.org/api/option" |
| ) |
| |
| func cmdUp(authOpts auth.Options) *subcommands.Command { |
| return &subcommands.Command{ |
| UsageLine: "up -bucket <bucket> -namespace <namespace> -manifest-path <manifest-path>", |
| ShortDesc: "Upload files from an input manifest to Google Cloud Storage.", |
| LongDesc: "Upload files from an input manifest to Google Cloud Storage.", |
| CommandRun: func() subcommands.CommandRun { |
| c := &upCmd{} |
| c.Init(authOpts) |
| return c |
| }, |
| } |
| } |
| |
| func isTransientError(err error) bool { |
| var apiErr *googleapi.Error |
| return transient.Tag.In(err) || (errors.As(err, &apiErr) && apiErr.Code >= 500) |
| } |
| |
| type upCmd struct { |
| commonFlags |
| |
| bucket string |
| namespace string |
| privateKeyPath string |
| manifestPath string |
| j int |
| logLevel logging.Level |
| } |
| |
| func (c *upCmd) Init(defaultAuthOpts auth.Options) { |
| c.commonFlags.Init(defaultAuthOpts) |
| c.Flags.StringVar(&c.bucket, "bucket", "", "Bucket to upload to.") |
| c.Flags.StringVar(&c.namespace, "namespace", "", "Namespace of non-deduplicated uploads relative to the root of the bucket.") |
| c.Flags.StringVar(&c.privateKeyPath, "pkey", "", "The path to an ED25519 private key encoded in the PKCS8 PEM format.\n"+ |
| "This can, for example, be generated by \"openssl genpkey -algorithm ed25519\".\n"+ |
| "If set, all images and build APIs will be signed and uploaded with their signatures\n"+ |
| "in GCS metadata, and the corresponding public key will be uploaded as well.") |
| c.Flags.StringVar(&c.manifestPath, "manifest-path", "", "Upload manifest.") |
| c.Flags.IntVar(&c.j, "j", 32, "Maximum number of concurrent uploading processes.") |
| c.Flags.Var(&c.logLevel, "log-level", "Logging level. Can be debug, info, warning, or error.") |
| } |
| |
| func (c *upCmd) parseArgs() error { |
| if err := c.commonFlags.Parse(); err != nil { |
| return err |
| } |
| if c.bucket == "" { |
| return errors.New("-bucket is required") |
| } |
| if c.namespace == "" { |
| return errors.New("-namespace is required") |
| } |
| if c.manifestPath == "" { |
| return errors.New("-manifest-path is required") |
| } |
| return nil |
| } |
| |
| func (c *upCmd) Run(a subcommands.Application, _ []string, _ subcommands.Env) int { |
| if err := c.parseArgs(); err != nil { |
| fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err) |
| return 1 |
| } |
| |
| if err := c.main(); err != nil { |
| fmt.Fprintf(a.GetErr(), "%s: %s\n", a.GetName(), err) |
| if isTransientError(err) || errors.Is(err, context.DeadlineExceeded) { |
| return exitTransientError |
| } |
| return 1 |
| } |
| return 0 |
| } |
| |
| func (c *upCmd) main() error { |
| ctx := context.Background() |
| ctx, cancel := signal.NotifyContext(ctx, syscall.SIGTERM) |
| defer cancel() |
| ctx = logging.SetLevel(ctx, c.logLevel) |
| ctx = gologger.StdConfig.Use(ctx) |
| |
| jsonInput, err := ioutil.ReadFile(c.manifestPath) |
| if err != nil { |
| return err |
| } |
| var manifest []types.Upload |
| if err := json.Unmarshal(jsonInput, &manifest); err != nil { |
| return err |
| } |
| |
| // Flatten any directories in the manifest to files. |
| files := []types.Upload{} |
| for _, upload := range manifest { |
| if len(upload.Source) == 0 { |
| files = append(files, upload) |
| continue |
| } |
| fileInfo, err := os.Stat(upload.Source) |
| if err != nil { |
| return err |
| } |
| if !fileInfo.IsDir() { |
| files = append(files, upload) |
| } else { |
| contents, err := dirToFiles(ctx, upload) |
| if err != nil { |
| return err |
| } |
| files = append(files, contents...) |
| } |
| } |
| |
| pkey, err := lib.PrivateKey(c.privateKeyPath) |
| if err != nil { |
| return fmt.Errorf("failed to get private key: %w", err) |
| } |
| if pkey != nil { |
| publicKey, err := lib.PublicKeyUpload(pkey.Public().(ed25519.PublicKey)) |
| if err != nil { |
| return err |
| } |
| files = append(files, *publicKey) |
| files, err = lib.Sign(files, pkey) |
| if err != nil { |
| return err |
| } |
| } |
| |
| authenticator := auth.NewAuthenticator(ctx, auth.OptionalLogin, c.parsedAuthOpts) |
| tokenSource, err := authenticator.TokenSource() |
| if err != nil { |
| if err == auth.ErrLoginRequired { |
| fmt.Fprintf(os.Stderr, "You need to login first by running:\n") |
| fmt.Fprintf(os.Stderr, " luci-auth login -scopes %q\n", strings.Join(c.parsedAuthOpts.Scopes, " ")) |
| } |
| return err |
| } |
| |
| sink, err := newCloudSink(ctx, c.bucket, option.WithTokenSource(tokenSource)) |
| if err != nil { |
| return err |
| } |
| defer sink.client.Close() |
| return uploadFiles(ctx, files, sink, c.j, c.namespace) |
| } |
| |
| // DataSink is an abstract data sink, providing a mockable interface to |
| // cloudSink, the GCS-backed implementation below. |
| type dataSink interface { |
| |
| // ObjectExistsAt returns whether an object of that name exists within the sink. |
| objectExistsAt(ctx context.Context, name string) (bool, *storage.ObjectAttrs, error) |
| |
| // Write writes the content of a reader to a sink object at the given name. |
| // If an object at that name does not exists, it will be created; else it |
| // will be overwritten. |
| write(ctx context.Context, upload *types.Upload) error |
| } |
| |
| // CloudSink is a GCS-backed data sink. |
| type cloudSink struct { |
| client *storage.Client |
| bucket *storage.BucketHandle |
| } |
| |
| func newCloudSink(ctx context.Context, bucket string, opts ...option.ClientOption) (*cloudSink, error) { |
| client, err := storage.NewClient(ctx, opts...) |
| if err != nil { |
| return nil, err |
| } |
| return &cloudSink{ |
| client: client, |
| bucket: client.Bucket(bucket), |
| }, nil |
| } |
| |
| func (s *cloudSink) objectExistsAt(ctx context.Context, name string) (bool, *storage.ObjectAttrs, error) { |
| attrs, err := lib.ObjectAttrs(ctx, s.bucket.Object(name)) |
| if err != nil { |
| if errors.Is(err, storage.ErrObjectNotExist) { |
| return false, nil, nil |
| } |
| return false, nil, err |
| } |
| // Check if MD5 is not set, mark this as a miss, then write() function will |
| // handle the race. |
| return len(attrs.MD5) != 0, attrs, nil |
| } |
| |
| // hasher is a io.Writer that calculates the MD5. |
| type hasher struct { |
| h hash.Hash |
| w io.Writer |
| } |
| |
| func (h *hasher) Write(p []byte) (int, error) { |
| n, err := h.w.Write(p) |
| _, _ = h.h.Write(p[:n]) |
| return n, err |
| } |
| |
| func (s *cloudSink) write(ctx context.Context, upload *types.Upload) error { |
| var reader io.Reader |
| if upload.Source != "" { |
| f, err := os.Open(upload.Source) |
| if err != nil { |
| return err |
| } |
| defer f.Close() |
| reader = f |
| } else { |
| reader = bytes.NewBuffer(upload.Contents) |
| } |
| |
| obj := s.bucket.Object(upload.Destination) |
| // Set timeouts to fail fast on unresponsive connections. |
| tctx, cancel := context.WithTimeout(ctx, perFileUploadTimeout) |
| defer cancel() |
| sw := obj.If(storage.Conditions{DoesNotExist: true}).NewWriter(tctx) |
| sw.ChunkSize = chunkSize |
| sw.ContentType = "application/octet-stream" |
| if upload.Compress { |
| sw.ContentEncoding = "gzip" |
| } |
| if upload.Metadata != nil { |
| sw.Metadata = upload.Metadata |
| } |
| // The CustomTime needs to be set to work with the lifecycle condition |
| // set on the GCS bucket. |
| sw.CustomTime = time.Now() |
| |
| // We optionally compress on the fly, and calculate the MD5 on the |
| // compressed data. |
| // Writes happen asynchronously, and so a nil may be returned while the write |
| // goes on to fail. It is recommended in |
| // https://godoc.org/cloud.google.com/go/storage#Writer.Write |
| // to return the value of Close() to detect the success of the write. |
| // Note that a gzip compressor would need to be closed before the storage |
| // writer that it wraps is. |
| h := &hasher{md5.New(), sw} |
| var writeErr, tarErr, zipErr error |
| if upload.Compress { |
| gzw := gzip.NewWriter(h) |
| if upload.TarHeader != nil { |
| tw := tar.NewWriter(gzw) |
| writeErr = tw.WriteHeader(upload.TarHeader) |
| if writeErr == nil { |
| _, writeErr = io.Copy(tw, reader) |
| } |
| tarErr = tw.Close() |
| } else { |
| _, writeErr = io.Copy(gzw, reader) |
| } |
| zipErr = gzw.Close() |
| } else { |
| _, writeErr = io.Copy(h, reader) |
| } |
| closeErr := sw.Close() |
| |
| // Keep the first error we encountered - and vet it for 'permissable' GCS |
| // error states. |
| // Note: consider an errorsmisc.FirstNonNil() helper if see this logic again. |
| err := writeErr |
| if err == nil { |
| err = tarErr |
| } |
| if err == nil { |
| err = zipErr |
| } |
| if err == nil { |
| err = closeErr |
| } |
| if err = checkGCSErr(ctx, err, upload.Destination); err != nil { |
| return err |
| } |
| |
| // Now confirm that the MD5 matches upstream, just in case. If the file was |
| // uploaded by another client (a race condition), loop until the MD5 is set. |
| // This guarantees that the file is properly uploaded before this function |
| // quits. |
| d := h.h.Sum(nil) |
| t := time.Second |
| const max = 30 * time.Second |
| for { |
| attrs, err := lib.ObjectAttrs(ctx, obj) |
| if err != nil { |
| return fmt.Errorf("failed to confirm MD5 for %s due to: %w", upload.Destination, err) |
| } |
| if len(attrs.MD5) == 0 { |
| time.Sleep(t) |
| if t += t / 2; t > max { |
| t = max |
| } |
| logging.Debugf(ctx, "waiting for MD5 for %s", upload.Destination) |
| continue |
| } |
| if !bytes.Equal(attrs.MD5, d) { |
| return md5MismatchError{fmt.Errorf("MD5 mismatch for %s; local: %x, remote: %x", upload.Destination, d, attrs.MD5)} |
| } |
| break |
| } |
| return nil |
| } |
| |
| // TODO(fxbug.dev/78017): Delete this type once fixed. |
| type md5MismatchError struct { |
| error |
| } |
| |
| // checkGCSErr validates the error for a GCS upload. |
| // |
| // If the precondition of the object not existing is not met on write (i.e., |
| // at the time of the write the object is there), then the server will |
| // respond with a 412. (See |
| // https://cloud.google.com/storage/docs/json_api/v1/status-codes and |
| // https://tools.ietf.org/html/rfc7232#section-4.2.) |
| // We do not report this as an error, however, as the associated object might |
| // have been created by another job after we checked its non-existence - and we |
| // wish to be resilient in the event of such a race. |
| func checkGCSErr(ctx context.Context, err error, name string) error { |
| if err == nil || err == io.EOF { |
| return nil |
| } |
| if strings.Contains(err.Error(), "Error 412") { |
| logging.Debugf(ctx, "object %q: encountered recoverable race condition during upload, already exists remotely", name) |
| return nil |
| } |
| return err |
| } |
| |
| // dirToFiles returns a list of the top-level files in the dir if dir.Recursive |
| // is false, else it returns all files in the dir. |
| func dirToFiles(ctx context.Context, dir types.Upload) ([]types.Upload, error) { |
| var files []types.Upload |
| var err error |
| var paths []string |
| if dir.Recursive { |
| err = filepath.Walk(dir.Source, func(path string, info os.FileInfo, err error) error { |
| if err != nil { |
| return err |
| } |
| if !info.IsDir() { |
| relPath, err := filepath.Rel(dir.Source, path) |
| if err != nil { |
| return err |
| } |
| paths = append(paths, relPath) |
| } |
| return nil |
| }) |
| } else { |
| var entries []os.FileInfo |
| entries, err = ioutil.ReadDir(dir.Source) |
| if err == nil { |
| for _, fi := range entries { |
| if fi.IsDir() { |
| continue |
| } |
| paths = append(paths, fi.Name()) |
| } |
| } |
| } |
| if err != nil { |
| return nil, err |
| } |
| for _, path := range paths { |
| files = append(files, types.Upload{ |
| Source: filepath.Join(dir.Source, path), |
| Destination: filepath.Join(dir.Destination, path), |
| Deduplicate: dir.Deduplicate, |
| Signed: dir.Signed, |
| }) |
| } |
| return files, nil |
| } |
| |
| func uploadFiles(ctx context.Context, files []types.Upload, dest dataSink, j int, namespace string) error { |
| if j <= 0 { |
| return fmt.Errorf("Concurrency factor j must be a positive number") |
| } |
| |
| uploads := make(chan types.Upload, j) |
| errs := make(chan error, j) |
| |
| queueUploads := func() { |
| defer close(uploads) |
| for _, f := range files { |
| if len(f.Source) != 0 { |
| fileInfo, err := os.Stat(f.Source) |
| if err != nil { |
| errs <- err |
| return |
| } |
| mtime := strconv.FormatInt(fileInfo.ModTime().Unix(), 10) |
| if f.Metadata == nil { |
| f.Metadata = map[string]string{} |
| } |
| f.Metadata[googleReservedFileMtime] = mtime |
| } |
| uploads <- f |
| } |
| } |
| |
| objsToRefreshTTL := make(chan string) |
| var wg sync.WaitGroup |
| wg.Add(j) |
| upload := func() { |
| defer wg.Done() |
| for upload := range uploads { |
| // Files which are not deduplicated are uploaded to the dedicated |
| // -namespace. The manifest may already set the namespace, and in |
| // that case, don't try to prepend it. |
| if !upload.Deduplicate && !strings.HasPrefix(upload.Destination, namespace) { |
| upload.Destination = path.Join(namespace, upload.Destination) |
| } |
| exists, attrs, err := dest.objectExistsAt(ctx, upload.Destination) |
| if err != nil { |
| errs <- err |
| return |
| } |
| if exists { |
| logging.Debugf(ctx, "object %q: already exists remotely", upload.Destination) |
| if !upload.Deduplicate { |
| errs <- fmt.Errorf("object %q: collided", upload.Destination) |
| return |
| } |
| // Add objects to update timestamps for that are older than |
| // daysSinceCustomTime. |
| if attrs != nil && time.Now().AddDate(0, 0, -daysSinceCustomTime).After(attrs.CustomTime) { |
| objsToRefreshTTL <- upload.Destination |
| } |
| continue |
| } |
| |
| if err := uploadFile(ctx, upload, dest); err != nil { |
| // If deduplicated file already exists remotely but the local |
| // and remote md5 hashes don't match, upload our version to a |
| // namespaced path so it can be compared with the |
| // already-existent version to help with debugging. |
| // TODO(fxbug.dev/78017): Delete this logic once fixed. |
| var md5Err md5MismatchError |
| if errors.As(err, &md5Err) { |
| upload.Destination = path.Join(namespace, "md5-mismatches", upload.Destination) |
| if err := uploadFile(ctx, upload, dest); err != nil { |
| logging.Warningf(ctx, "failed to upload md5-mismatch file %q for debugging: %s", upload.Destination, err) |
| } |
| } |
| errs <- err |
| return |
| } |
| } |
| } |
| |
| go queueUploads() |
| for i := 0; i < j; i++ { |
| go upload() |
| } |
| go func() { |
| wg.Wait() |
| close(errs) |
| close(objsToRefreshTTL) |
| }() |
| var objs []string |
| for o := range objsToRefreshTTL { |
| objs = append(objs, o) |
| } |
| |
| if err := <-errs; err != nil { |
| return err |
| } |
| // Upload a file listing all the deduplicated files that already existed in |
| // the upload destination. A post-processor will use this file to update the |
| // CustomTime of the objects and extend their TTL. |
| if len(objs) > 0 { |
| objsToRefreshTTLUpload := types.Upload{ |
| Contents: []byte(strings.Join(objs, "\n")), |
| Destination: path.Join(namespace, objsToRefreshTTLTxt), |
| } |
| return uploadFile(ctx, objsToRefreshTTLUpload, dest) |
| } |
| return nil |
| } |
| |
| func uploadFile(ctx context.Context, upload types.Upload, dest dataSink) error { |
| logging.Debugf(ctx, "object %q: attempting creation", upload.Destination) |
| if err := lib.Retry(ctx, func() error { |
| err := dest.write(ctx, &upload) |
| if err != nil { |
| logging.Warningf(ctx, "error uploading %q: %s", upload.Destination, err) |
| } |
| return err |
| }); err != nil { |
| return fmt.Errorf("%s: %w", upload.Destination, err) |
| } |
| logging.Infof(ctx, "object %q: created", upload.Destination) |
| return nil |
| } |