blob: 3aa7a95184b1735f62fd51dce88d4b8f6e23b61a [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package main
import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
"crypto/ed25519"
"crypto/md5"
"encoding/json"
"errors"
"flag"
"fmt"
"hash"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"cloud.google.com/go/storage"
"github.com/google/subcommands"
"google.golang.org/api/googleapi"
"go.fuchsia.dev/fuchsia/tools/artifactory"
"go.fuchsia.dev/fuchsia/tools/build"
"go.fuchsia.dev/fuchsia/tools/lib/gcsutil"
"go.fuchsia.dev/fuchsia/tools/lib/logger"
)
const (
// The exit code emitted by the `up` command when it fails due to a
// transient GCS error.
exitTransientError = 3
// The size in bytes at which files will be read and written to GCS.
chunkSize = 100 * 1024 * 1024
// Relative path within the build directory to the repo produced by a build.
repoSubpath = "amber-files"
// Names of the repository metadata, key, blob, and target directories within a repo.
metadataDirName = "repository"
keyDirName = "keys"
blobDirName = "blobs"
targetDirName = "targets"
// Names of directories to be uploaded to in GCS.
assemblyInputArchivesDirName = "assembly"
buildsDirName = "builds"
buildAPIDirName = "build_api"
buildidDirName = "buildid"
debugDirName = "debug"
hostTestDirName = "host_tests"
imageDirName = "images"
packageDirName = "packages"
sdkArchivesDirName = "sdk"
toolDirName = "tools"
// A record of all of the fuchsia debug symbols processed.
// This is eventually consumed by crash reporting infrastructure.
// TODO(fxbug.dev/75356): Have the crash reporting infrastructure
// consume build-ids.json instead.
buildIDsTxt = "build-ids.txt"
// A mapping of build ids to binary labels.
buildIDsToLabelsManifestName = "build-ids.json"
// The blobs manifest. TODO(fxbug.dev/60322) remove this.
blobManifestName = "blobs.json"
// A list of all Public Platform Surface Areas.
ctsPlasaReportName = "test_coverage_report.plasa.json"
// A list of the objects that need their TTL refreshed.
objsToRefreshTTLTxt = "objs_to_refresh_ttl.txt"
// The number of days since the CustomTime of an object after which the
// TTL should be refreshed. This is an arbitrary value chosen to avoid
// refreshing the TTL of frequently deduplicated objects repeatedly over
// a short time period.
daysSinceCustomTime = 10
// The ELF sizes manifest.
elfSizesManifestName = "elf_sizes.json"
// A mapping of fidl mangled names to api functions.
fidlMangledToApiMappingManifestName = "fidl_mangled_to_api_mapping.json"
// Timeout for every file upload.
perFileUploadTimeout = 8 * time.Minute
// Metadata keys.
googleReservedFileMtime = "goog-reserved-file-mtime"
)
type upCommand struct {
// GCS bucket to which build artifacts will be uploaded.
gcsBucket string
// Unique namespace under which to index artifacts.
namespace string
// The maximum number of concurrent uploading routines.
j int
// An ED25519 private key encoded in the PKCS8 PEM format.
privateKeyPath string
// Whether or not to upload host tests.
uploadHostTests bool
}
func (upCommand) Name() string { return "up" }
func (upCommand) Synopsis() string { return "upload artifacts from a build to Google Cloud Storage" }
func (upCommand) Usage() string {
return `
artifactory up -bucket $GCS_BUCKET -namespace $NAMESPACE <build directory>
Uploads artifacts from a build to $GCS_BUCKET with the following structure:
├── $GCS_BUCKET
│ │ ├── assembly
│ │ │ └── <assembly input archives>
│ │ ├── blobs
│ │ │ └── <blob names>
│ │ ├── debug
│ │ │ └── <debug binaries in zxdb format>
│ │ ├── buildid
│ │ │ └── <debug binaries in debuginfod format>
│ │ ├── builds
│ │ │ ├── $NAMESPACE
│ │ │ │ ├── build-ids.json
│ │ │ │ ├── build-ids.txt
│ │ │ │ ├── jiri.snapshot
│ │ │ │ ├── objs_to_refresh_ttl.txt
│ │ │ │ ├── publickey.pem
│ │ │ │ ├── images
│ │ │ │ │ └── <images>
│ │ │ │ ├── packages
│ │ │ │ │ ├── all_blobs.json
│ │ │ │ │ ├── blobs.json
│ │ │ │ │ ├── elf_sizes.json
│ │ │ │ │ ├── repository
│ │ │ │ │ │ ├── targets
│ │ │ │ │ │ │ └── <package repo target files>
│ │ │ │ │ │ └── <package repo metadata files>
│ │ │ │ │ └── keys
│ │ │ │ │ └── <package repo keys>
│ │ │ │ ├── sdk
│ │ │ │ │ ├── <host-independent SDK archives>
│ │ │ │ │ └── <OS-CPU>
│ │ │ │ │ └── <host-specific SDK archives>
│ │ │ │ ├── build_api
│ │ │ │ │ └── <build API module JSON>
| | | | ├── host_tests
│ │ │ │ │ └── <host tests and deps, same hierarchy as build dir>
│ │ │ │ ├── tools
│ │ │ │ │ └── <OS>-<CPU>
│ │ │ │ │ └── <tool names>
flags:
`
}
func (cmd *upCommand) SetFlags(f *flag.FlagSet) {
f.StringVar(&cmd.gcsBucket, "bucket", "", "GCS bucket to which artifacts will be uploaded")
f.StringVar(&cmd.namespace, "namespace", "", "Namespace under which to index uploaded artifacts")
f.IntVar(&cmd.j, "j", 32, "maximum number of concurrent uploading processes")
f.StringVar(&cmd.privateKeyPath, "pkey", "", "The path to an ED25519 private key encoded in the PKCS8 PEM format.\n"+
"This can, for example, be generated by \"openssl genpkey -algorithm ed25519\".\n"+
"If set, all images and build APIs will be signed and uploaded with their signatures\n"+
"in GCS metadata, and the corresponding public key will be uploaded as well.")
f.BoolVar(&cmd.uploadHostTests, "upload-host-tests", false, "whether or not to upload host tests and their runtime deps.")
}
func isTransientError(err error) bool {
_, transient := err.(gcsutil.TransientError)
var apiErr *googleapi.Error
return transient || (errors.As(err, &apiErr) && apiErr.Code >= 500)
}
func (cmd upCommand) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus {
args := f.Args()
if len(args) != 1 {
logger.Errorf(ctx, "exactly one positional argument expected: the build directory root")
return subcommands.ExitFailure
}
if err := cmd.execute(ctx, args[0]); err != nil {
logger.Errorf(ctx, "%v", err)
// Use a different exit code if the failure is a (likely transient)
// server error so the infrastructure knows to consider it as an infra
// failure.
if isTransientError(err) || errors.Is(err, context.DeadlineExceeded) {
return exitTransientError
}
return subcommands.ExitFailure
}
return subcommands.ExitSuccess
}
func (cmd upCommand) execute(ctx context.Context, buildDir string) error {
if cmd.gcsBucket == "" {
return fmt.Errorf("-bucket is required")
}
if cmd.namespace == "" {
return fmt.Errorf("-namespace is required")
}
m, err := build.NewModules(buildDir)
if err != nil {
return err
}
sink, err := newCloudSink(ctx, cmd.gcsBucket)
if err != nil {
return err
}
defer sink.client.Close()
repo := path.Join(buildDir, repoSubpath)
metadataDir := path.Join(repo, metadataDirName)
keyDir := path.Join(repo, keyDirName)
blobDir := path.Join(metadataDir, blobDirName)
targetDir := path.Join(metadataDir, targetDirName)
buildsNamespaceDir := path.Join(buildsDirName, cmd.namespace)
packageNamespaceDir := path.Join(buildsNamespaceDir, packageDirName)
imageNamespaceDir := path.Join(buildsNamespaceDir, imageDirName)
dirs := []artifactory.Upload{
{
Source: blobDir,
Destination: blobDirName,
Deduplicate: true,
},
{
Source: metadataDir,
Destination: path.Join(packageNamespaceDir, metadataDirName),
Deduplicate: false,
},
{
Source: keyDir,
Destination: path.Join(packageNamespaceDir, keyDirName),
Deduplicate: false,
},
{
Source: targetDir,
Destination: path.Join(packageNamespaceDir, metadataDirName, targetDirName),
Deduplicate: false,
Recursive: true,
},
}
files := []artifactory.Upload{
{
Source: path.Join(buildDir, blobManifestName),
Destination: path.Join(packageNamespaceDir, blobManifestName),
},
{
Source: path.Join(buildDir, elfSizesManifestName),
Destination: path.Join(packageNamespaceDir, elfSizesManifestName),
},
// Used for CTS test coverage.
{
Source: path.Join(buildDir, fidlMangledToApiMappingManifestName),
Destination: path.Join(buildsNamespaceDir, fidlMangledToApiMappingManifestName),
},
{
Source: path.Join(buildDir, ctsPlasaReportName),
Destination: path.Join(buildsNamespaceDir, ctsPlasaReportName),
},
}
allBlobsUpload, err := artifactory.BlobsUpload(m, path.Join(packageNamespaceDir, "all_blobs.json"))
if err != nil {
return fmt.Errorf("failed to obtain blobs upload: %w", err)
}
files = append(files, allBlobsUpload)
pkey, err := artifactory.PrivateKey(cmd.privateKeyPath)
if err != nil {
return fmt.Errorf("failed to get private key: %w", err)
}
// Sign the images for release builds.
images, err := artifactory.ImageUploads(m, imageNamespaceDir)
if err != nil {
return err
}
if pkey != nil {
images, err = artifactory.Sign(images, pkey)
if err != nil {
return err
}
}
files = append(files, images...)
productBundle, err := artifactory.ProductBundleUploads(m, packageNamespaceDir, blobDirName, imageNamespaceDir)
if err != nil {
return err
}
// Check that an upload isn't nil as product bundle doesn't exist for "bringup" and SDK builds.
if productBundle != nil {
files = append(files, *productBundle)
}
// Sign the build_info.json for release builds.
buildAPIs := artifactory.BuildAPIModuleUploads(m, path.Join(buildsNamespaceDir, buildAPIDirName))
if pkey != nil {
buildAPIs, err = artifactory.Sign(buildAPIs, pkey)
if err != nil {
return err
}
}
files = append(files, buildAPIs...)
assemblyInputArchives := artifactory.AssemblyInputArchiveUploads(m, path.Join(buildsNamespaceDir, assemblyInputArchivesDirName))
files = append(files, assemblyInputArchives...)
sdkArchives := artifactory.SDKArchiveUploads(m, path.Join(buildsNamespaceDir, sdkArchivesDirName))
files = append(files, sdkArchives...)
// Sign the tools for release builds.
tools := artifactory.ToolUploads(m, path.Join(buildsNamespaceDir, toolDirName))
if pkey != nil {
tools, err = artifactory.Sign(tools, pkey)
if err != nil {
return err
}
}
files = append(files, tools...)
debugBinaries, buildIDsToLabels, buildIDs, err := artifactory.DebugBinaryUploads(ctx, m, debugDirName, buildidDirName)
if err != nil {
return err
}
files = append(files, debugBinaries...)
buildIDManifest, err := createBuildIDManifest(buildIDs)
if err != nil {
return err
}
defer os.Remove(buildIDManifest)
files = append(files, artifactory.Upload{
Source: buildIDManifest,
Destination: path.Join(buildsNamespaceDir, buildIDsTxt),
})
buildIDsToLabelsJSON, err := json.MarshalIndent(buildIDsToLabels, "", " ")
if err != nil {
return err
}
files = append(files, artifactory.Upload{
Contents: buildIDsToLabelsJSON,
Destination: path.Join(buildsNamespaceDir, buildIDsToLabelsManifestName),
})
snapshot, err := artifactory.JiriSnapshotUpload(m, buildsNamespaceDir)
if err != nil {
return err
}
files = append(files, *snapshot)
if pkey != nil {
publicKey, err := artifactory.PublicKeyUpload(buildsNamespaceDir, pkey.Public().(ed25519.PublicKey))
if err != nil {
return err
}
files = append(files, *publicKey)
}
if cmd.uploadHostTests {
hostTests, err := artifactory.HostTestUploads(m.TestSpecs(), m.BuildDir(), path.Join(buildsNamespaceDir, hostTestDirName))
if err != nil {
return fmt.Errorf("failed to get host test files: %v", err)
}
files = append(files, hostTests...)
}
for _, dir := range dirs {
contents, err := dirToFiles(ctx, dir)
if err != nil {
return err
}
files = append(files, contents...)
}
return uploadFiles(ctx, files, sink, cmd.j, buildsNamespaceDir)
}
func createBuildIDManifest(buildIDs []string) (string, error) {
manifest, err := ioutil.TempFile("", buildIDsTxt)
if err != nil {
return "", err
}
defer manifest.Close()
_, err = io.WriteString(manifest, strings.Join(buildIDs, "\n"))
return manifest.Name(), err
}
// DataSink is an abstract data sink, providing a mockable interface to
// cloudSink, the GCS-backed implementation below.
type dataSink interface {
// ObjectExistsAt returns whether an object of that name exists within the sink.
objectExistsAt(ctx context.Context, name string) (bool, *storage.ObjectAttrs, error)
// Write writes the content of a reader to a sink object at the given name.
// If an object at that name does not exists, it will be created; else it
// will be overwritten.
write(ctx context.Context, upload *artifactory.Upload) error
}
// CloudSink is a GCS-backed data sink.
type cloudSink struct {
client *storage.Client
bucket *storage.BucketHandle
}
func newCloudSink(ctx context.Context, bucket string) (*cloudSink, error) {
client, err := storage.NewClient(ctx)
if err != nil {
return nil, err
}
return &cloudSink{
client: client,
bucket: client.Bucket(bucket),
}, nil
}
func (s *cloudSink) objectExistsAt(ctx context.Context, name string) (bool, *storage.ObjectAttrs, error) {
attrs, err := gcsutil.ObjectAttrs(ctx, s.bucket.Object(name))
if err != nil {
if errors.Is(err, storage.ErrObjectNotExist) {
return false, nil, nil
}
return false, nil, err
}
// Check if MD5 is not set, mark this as a miss, then write() function will
// handle the race.
return len(attrs.MD5) != 0, attrs, nil
}
// hasher is a io.Writer that calculates the MD5.
type hasher struct {
h hash.Hash
w io.Writer
}
func (h *hasher) Write(p []byte) (int, error) {
n, err := h.w.Write(p)
_, _ = h.h.Write(p[:n])
return n, err
}
func (s *cloudSink) write(ctx context.Context, upload *artifactory.Upload) error {
var reader io.Reader
if upload.Source != "" {
f, err := os.Open(upload.Source)
if err != nil {
return err
}
defer f.Close()
reader = f
} else {
reader = bytes.NewBuffer(upload.Contents)
}
obj := s.bucket.Object(upload.Destination)
// Set timeouts to fail fast on unresponsive connections.
tctx, cancel := context.WithTimeout(ctx, perFileUploadTimeout)
defer cancel()
sw := obj.If(storage.Conditions{DoesNotExist: true}).NewWriter(tctx)
sw.ChunkSize = chunkSize
sw.ContentType = "application/octet-stream"
if upload.Compress {
sw.ContentEncoding = "gzip"
}
if upload.Metadata != nil {
sw.Metadata = upload.Metadata
}
// The CustomTime needs to be set to work with the lifecycle condition
// set on the GCS bucket.
sw.CustomTime = time.Now()
// We optionally compress on the fly, and calculate the MD5 on the
// compressed data.
// Writes happen asynchronously, and so a nil may be returned while the write
// goes on to fail. It is recommended in
// https://godoc.org/cloud.google.com/go/storage#Writer.Write
// to return the value of Close() to detect the success of the write.
// Note that a gzip compressor would need to be closed before the storage
// writer that it wraps is.
h := &hasher{md5.New(), sw}
var writeErr, tarErr, zipErr error
if upload.Compress {
gzw := gzip.NewWriter(h)
if upload.TarHeader != nil {
tw := tar.NewWriter(gzw)
writeErr = tw.WriteHeader(upload.TarHeader)
if writeErr == nil {
_, writeErr = io.Copy(tw, reader)
}
tarErr = tw.Close()
} else {
_, writeErr = io.Copy(gzw, reader)
}
zipErr = gzw.Close()
} else {
_, writeErr = io.Copy(h, reader)
}
closeErr := sw.Close()
// Keep the first error we encountered - and vet it for 'permissable' GCS
// error states.
// Note: consider an errorsmisc.FirstNonNil() helper if see this logic again.
err := writeErr
if err == nil {
err = tarErr
}
if err == nil {
err = zipErr
}
if err == nil {
err = closeErr
}
if err = checkGCSErr(ctx, err, upload.Destination); err != nil {
return err
}
// Now confirm that the MD5 matches upstream, just in case. If the file was
// uploaded by another client (a race condition), loop until the MD5 is set.
// This guarantees that the file is properly uploaded before this function
// quits.
d := h.h.Sum(nil)
t := time.Second
const max = 30 * time.Second
for {
attrs, err := gcsutil.ObjectAttrs(ctx, obj)
if err != nil {
return fmt.Errorf("failed to confirm MD5 for %s due to: %w", upload.Destination, err)
}
if len(attrs.MD5) == 0 {
time.Sleep(t)
if t += t / 2; t > max {
t = max
}
logger.Debugf(ctx, "waiting for MD5 for %s", upload.Destination)
continue
}
if !bytes.Equal(attrs.MD5, d) {
return md5MismatchError{fmt.Errorf("MD5 mismatch for %s; local: %x, remote: %x", upload.Destination, d, attrs.MD5)}
}
break
}
return nil
}
// TODO(fxbug.dev/78017): Delete this type once fixed.
type md5MismatchError struct {
error
}
// checkGCSErr validates the error for a GCS upload.
//
// If the precondition of the object not existing is not met on write (i.e.,
// at the time of the write the object is there), then the server will
// respond with a 412. (See
// https://cloud.google.com/storage/docs/json_api/v1/status-codes and
// https://tools.ietf.org/html/rfc7232#section-4.2.)
// We do not report this as an error, however, as the associated object might
// have been created by another job after we checked its non-existence - and we
// wish to be resilient in the event of such a race.
func checkGCSErr(ctx context.Context, err error, name string) error {
if err == nil || err == io.EOF {
return nil
}
if strings.Contains(err.Error(), "Error 412") {
logger.Debugf(ctx, "object %q: encountered recoverable race condition during upload, already exists remotely", name)
return nil
}
return err
}
// dirToFiles returns a list of the top-level files in the dir if dir.Recursive
// is false, else it returns all files in the dir.
func dirToFiles(ctx context.Context, dir artifactory.Upload) ([]artifactory.Upload, error) {
var files []artifactory.Upload
var err error
var paths []string
if dir.Recursive {
err = filepath.Walk(dir.Source, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
relPath, err := filepath.Rel(dir.Source, path)
if err != nil {
return err
}
paths = append(paths, relPath)
}
return nil
})
} else {
var entries []os.FileInfo
entries, err = ioutil.ReadDir(dir.Source)
if err == nil {
for _, fi := range entries {
if fi.IsDir() {
continue
}
paths = append(paths, fi.Name())
}
}
}
if os.IsNotExist(err) {
logger.Debugf(ctx, "%s does not exist; skipping upload", dir.Source)
return nil, nil
} else if err != nil {
return nil, err
}
for _, path := range paths {
files = append(files, artifactory.Upload{
Source: filepath.Join(dir.Source, path),
Destination: filepath.Join(dir.Destination, path),
Deduplicate: dir.Deduplicate,
})
}
return files, nil
}
func uploadFiles(ctx context.Context, files []artifactory.Upload, dest dataSink, j int, buildsNamespaceDir string) error {
if j <= 0 {
return fmt.Errorf("Concurrency factor j must be a positive number")
}
uploads := make(chan artifactory.Upload, j)
errs := make(chan error, j)
queueUploads := func() {
defer close(uploads)
for _, f := range files {
if len(f.Source) != 0 {
fileInfo, err := os.Stat(f.Source)
if err != nil {
// The associated artifacts might not actually have been created, which is valid.
if os.IsNotExist(err) {
logger.Infof(ctx, "%s does not exist; skipping upload", f.Source)
continue
}
errs <- err
return
}
mtime := strconv.FormatInt(fileInfo.ModTime().Unix(), 10)
if f.Metadata == nil {
f.Metadata = map[string]string{}
}
f.Metadata[googleReservedFileMtime] = mtime
}
uploads <- f
}
}
objsToRefreshTTL := make(chan string)
var wg sync.WaitGroup
wg.Add(j)
upload := func() {
defer wg.Done()
for upload := range uploads {
exists, attrs, err := dest.objectExistsAt(ctx, upload.Destination)
if err != nil {
errs <- err
return
}
if exists {
logger.Debugf(ctx, "object %q: already exists remotely", upload.Destination)
if !upload.Deduplicate {
errs <- fmt.Errorf("object %q: collided", upload.Destination)
return
}
// Add objects to update timestamps for that are older than daysSinceCustomTime.
if attrs != nil && time.Now().AddDate(0, 0, -daysSinceCustomTime).After(attrs.CustomTime) {
objsToRefreshTTL <- upload.Destination
}
continue
}
if err := uploadFile(ctx, upload, dest); err != nil {
// If deduplicated file already exists remotely but the local
// and remote md5 hashes don't match, upload our version to a
// namespaced path so it can be compared with the
// already-existent version to help with debugging.
// TODO(fxbug.dev/78017): Delete this logic once fixed.
var md5Err md5MismatchError
if errors.As(err, &md5Err) {
upload.Destination = path.Join(buildsNamespaceDir, "md5-mismatches", upload.Destination)
if err := uploadFile(ctx, upload, dest); err != nil {
logger.Warningf(ctx, "failed to upload md5-mismatch file %q for debugging: %s", upload.Destination, err)
}
}
errs <- err
return
}
}
}
go queueUploads()
for i := 0; i < j; i++ {
go upload()
}
go func() {
wg.Wait()
close(errs)
close(objsToRefreshTTL)
}()
var objs []string
for o := range objsToRefreshTTL {
objs = append(objs, o)
}
if err := <-errs; err != nil {
return err
}
// Upload a file listing all the deduplicated files that already existed in the
// upload destination. A post-processor will use this file to update the CustomTime
// of the objects and extend their TTL.
if len(objs) > 0 {
objsToRefreshTTLUpload := artifactory.Upload{
Contents: []byte(strings.Join(objs, "\n")),
Destination: path.Join(buildsNamespaceDir, objsToRefreshTTLTxt),
}
return uploadFile(ctx, objsToRefreshTTLUpload, dest)
}
return nil
}
func uploadFile(ctx context.Context, upload artifactory.Upload, dest dataSink) error {
logger.Debugf(ctx, "object %q: attempting creation", upload.Destination)
if err := gcsutil.Retry(ctx, func() error {
err := dest.write(ctx, &upload)
if err != nil {
logger.Warningf(ctx, "error uploading %q: %s", upload.Destination, err)
}
return err
}); err != nil {
return fmt.Errorf("%s: %w", upload.Destination, err)
}
logger.Infof(ctx, "object %q: created", upload.Destination)
return nil
}