[artifacts] Add a subcommand to log test outputs

Re-refactor artifacts now that I know some things don't
need to be part of the public interface.

This tool will eventually be used in place of manual file uploads
on fuchsia ci bots so that all readers/writers can expect the
same filesystem hierarchy (e.g. resultstore uploader and e2e tests
in fuchsia.git)

TEST=Uploaded dummy data to CloudStorage to ensure the directory
 structure is correct. (gs://fuchsia-infra-test/builds/kendal-test/)

Change-Id: If38aac47e9cfe12201c88032b9ba464d7a732277
diff --git a/artifacts/artifacts.go b/artifacts/artifacts.go
index ee43f21..2012513 100644
--- a/artifacts/artifacts.go
+++ b/artifacts/artifacts.go
@@ -29,7 +29,7 @@
 // build is the BuildBucket build ID.
 func (c *Client) GetBuildDir(bucket, build string) *BuildDirectory {
 	bkt := c.client.Bucket(bucket)
-	return &BuildDirectory{&Directory{
+	return &BuildDirectory{&directory{
 		bucket: bkt,
 		root:   strings.Join([]string{"builds", build}, "/"),
 	}}
diff --git a/artifacts/directory.go b/artifacts/directory.go
index 090d9b7..c1c8998 100644
--- a/artifacts/directory.go
+++ b/artifacts/directory.go
@@ -13,23 +13,43 @@
 	"google.golang.org/api/iterator"
 )
 
-// Directory is a handle to a Cloud Storage "directory". It provides a minimal
+// The default name of a test's output file. We only store a single file containing a
+// test's combined stdout and stderr streams today.  This will change in the future.
+const DefaultTestOutputName = "stdout-stderr.txt"
+
+// BuildDirectory represents a Fuchsia CI build's artifact directory. Refer to the
+// layout in doc.go for the layout of this directory. When amending the layout, prefer
+// adding convenience methods on this type to encourages all clients to create objects
+// within this BuildDirectory using the same layout.
+type BuildDirectory struct {
+	*directory
+}
+
+// NewTestOutputObject creates a new ObjectHandle to hold the output of the given test
+// execution in this BuildDirectory. testName is the name of the test, envName is the
+// canonical name of the test environment. Both are normalized according to
+// normalizePathSegment.
+func (d BuildDirectory) NewTestOutputObject(ctx context.Context, testName, envName string) *storage.ObjectHandle {
+	return d.cd("tests").cd(testName).cd(envName).Object(DefaultTestOutputName)
+}
+
+// directory is a handle to a Cloud Storage "directory". It provides a minimal
 // filesystem-like interface for a Cloud Storage object hierarchy where "/" is used as the
 // path separator. Any methods added to this struct are forward to other directory types.
-type Directory struct {
+type directory struct {
 	bucket *storage.BucketHandle
 	root   string
 }
 
 // Object returns a handle to the given object within this directory. path is the path to
 // the object relative to this directory.
-func (d *Directory) Object(path string) *storage.ObjectHandle {
+func (d *directory) Object(path string) *storage.ObjectHandle {
 	path = fmt.Sprintf("%s/%s", d.root, path)
 	return d.bucket.Object(path)
 }
 
 // List lists all of the objects in this directory.
-func (d *Directory) List(ctx context.Context) ([]string, error) {
+func (d *directory) List(ctx context.Context) ([]string, error) {
 	prefix := strings.Join([]string{d.root}, "/")
 	iter := d.bucket.Objects(ctx, &storage.Query{
 		Prefix: prefix,
@@ -50,39 +70,13 @@
 	return items, nil
 }
 
-// CD returns a handle to some child Directory of this Directory. It is up to the caller
-// to ensure that child exists and is actually a directory.
-func (d *Directory) cd(child string) *Directory {
-	return &Directory{
+// CD returns a handle to some child directory of this directory. directChild should be a
+// direct child of the current directory, not a grandchild or any entry deeper in the
+// tree. The child's name is normalized according to normalizePathSegment, so using a
+// nested path may result in an unexpected file tree.
+func (d *directory) cd(directChild string) *directory {
+	return &directory{
 		bucket: d.bucket,
-		root:   fmt.Sprintf("%s/%s", d.root, child),
+		root:   fmt.Sprintf("%s/%s", d.root, normalizePathSegment(directChild)),
 	}
 }
-
-// BuildDirectory represents a Fuchsia CI build's artifact directory. Refer to the
-// layout in doc.go for the layout of this directory. When amending the layout, prefer
-// adding methods to create new subdirectories on this type instead of calling Object()
-// with paths containing slashes. This encourages all clients of this package create
-// objects within this BuildDirectory using the same layout.
-type BuildDirectory struct {
-	*Directory
-}
-
-// Test returns the TestDirectory containing artifacts for a particular test. The name is
-// normalized according to normalizePathSegment.
-func (d BuildDirectory) Test(name string) *TestDirectory {
-	subdir := fmt.Sprintf("tests/%s", normalizePathSegment(name))
-	return &TestDirectory{d.Directory.cd(subdir)}
-}
-
-// TestDirectory contains artifacts for a particular test.
-type TestDirectory struct {
-	*Directory
-}
-
-// Env returns a Directory for objects relevant to an execution of this TestDirectory's
-// test in a particular environment. The name is normalized according to
-// normalizePathSegment.
-func (d TestDirectory) Env(name string) *Directory {
-	return d.Directory.cd(normalizePathSegment(name))
-}
diff --git a/cmd/artifacts/main.go b/cmd/artifacts/main.go
index b6e565a..eda9bad 100644
--- a/cmd/artifacts/main.go
+++ b/cmd/artifacts/main.go
@@ -7,15 +7,16 @@
 //
 // Example usage:
 //
-// $ artifacts ls -build=1234567890
-// $ artifacts cp -build=2345678900 -src=packages.tar.gz -dst=local/packages.tar.gz
+// $ artifacts ls -build=123
+// $ artifacts cp -build=123 -src=packages.tar.gz -dst=local/packages.tar.gz
+// $ artifacts storetestoutputs -build=123 -testenv=TEST_ENV outputs.json
+
 package main
 
 import (
 	"context"
 	"flag"
 	"os"
-
 	"github.com/google/subcommands"
 )
 
@@ -25,6 +26,7 @@
 	subcommands.Register(subcommands.FlagsCommand(), "")
 	subcommands.Register(&ListCommand{}, "")
 	subcommands.Register(&CopyCommand{}, "")
+	subcommands.Register(&StoreTestOutputsCommand{}, "")
 
 	flag.Parse()
 	os.Exit(int(subcommands.Execute(context.Background())))
diff --git a/cmd/artifacts/storetestoutputs.go b/cmd/artifacts/storetestoutputs.go
new file mode 100644
index 0000000..1ed44a3
--- /dev/null
+++ b/cmd/artifacts/storetestoutputs.go
@@ -0,0 +1,176 @@
+// Copyright 2019 The Fuchsia Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package main
+
+import (
+	"context"
+	"encoding/json"
+	"flag"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"log"
+	"os"
+	"sync"
+
+	"fuchsia.googlesource.com/tools/artifacts"
+	"github.com/google/subcommands"
+)
+
+const (
+	// The maximum number of concurrent uploads. We rate limit this because we don't know
+	// how many entries there are in the input manifest file, so we don't know how many
+	// go-routines will be kicked off during execution. At around 100 or so concurrent
+	// uploads, the Golang storage API starts returning 400 errors and files fail to flush
+	// (when writer.Close() is called). Tweak this number as needed.
+	maxConcurrentUploads = 100
+)
+
+// TestOutputsManifest describes how to upload test files. This is intentionally written
+// to match the schema of Fuchsia's existing summary.json "tests" field for backward
+// compatibility. We should migrate away from using a single output file for stdout and
+// stderr and prefer uploading separate files.
+type TestOutputsManifest = []TestOutputs
+type TestOutputs struct {
+	Name       string `json:"name"`
+	OutputFile string `json:"output_file"`
+}
+
+// StoreTestOutputsCommand performs a batch upload of test outputs to Cloud Storage.
+type StoreTestOutputsCommand struct {
+	bucket  string
+	build   string
+	testEnv string
+	workers sync.WaitGroup
+}
+
+func (*StoreTestOutputsCommand) Name() string {
+	return "storetestoutputs"
+}
+
+func (*StoreTestOutputsCommand) Usage() string {
+	return fmt.Sprintf(`storetestoutputs [flags] outputs.json
+
+The input manifest is a JSON list of objects with the following scheme:
+
+	{
+	  "name": "The name of the test",
+	  "output_file": "/path/to/test/output/file"
+	}
+
+output_file is written to Cloud Storage as just %q within the hierarchy documented in
+//tools/artifacts/doc.go.`, artifacts.DefaultTestOutputName)
+}
+
+func (*StoreTestOutputsCommand) Synopsis() string {
+	return fmt.Sprintf("stores test output files in Cloud Storage")
+}
+
+func (cmd *StoreTestOutputsCommand) SetFlags(f *flag.FlagSet) {
+	f.StringVar(&cmd.bucket, "bucket", "", "The Cloud Storage bucket to write to")
+	f.StringVar(&cmd.build, "build", "", "The BuildBucket build ID")
+	f.StringVar(&cmd.testEnv, "testenv", "", "A canonical name for the test environment")
+}
+
+func (cmd *StoreTestOutputsCommand) Execute(ctx context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus {
+	if err := cmd.validateFlags(f); err != nil {
+		log.Println(err)
+		return subcommands.ExitFailure
+	}
+	manifest, err := cmd.parseManifestFile(f.Arg(0))
+	if err != nil {
+		log.Println(err)
+		return subcommands.ExitFailure
+	}
+	if !cmd.execute(ctx, manifest) {
+		return subcommands.ExitFailure
+	}
+	return subcommands.ExitSuccess
+}
+
+func (cmd *StoreTestOutputsCommand) parseManifestFile(path string) (TestOutputsManifest, error) {
+	bytes, err := ioutil.ReadFile(path)
+	if err != nil {
+		return nil, fmt.Errorf("failed to read %q: %v", path, err)
+	}
+	var manifest TestOutputsManifest
+	if err := json.Unmarshal(bytes, &manifest); err != nil {
+		return nil, fmt.Errorf("fail to unmarshal manifest: %v", err)
+	}
+	return manifest, nil
+}
+
+func (cmd *StoreTestOutputsCommand) validateFlags(f *flag.FlagSet) error {
+	if f.NArg() != 1 {
+		return fmt.Errorf("expect exactly 1 positional argument")
+	}
+	if cmd.bucket == "" {
+		return fmt.Errorf("missing -bucket")
+	}
+	if cmd.build == "" {
+		return fmt.Errorf("missing -build")
+	}
+	if cmd.testEnv == "" {
+		return fmt.Errorf("missing -testenv")
+	}
+	return nil
+}
+
+// execute spawns a worker pool to perform the upload. The pool is limited in size by
+// maxConcurrentUploads to avoid load failures in the storage API.
+func (cmd *StoreTestOutputsCommand) execute(ctx context.Context, manifest TestOutputsManifest) bool {
+	const workerCount = maxConcurrentUploads
+	success := true
+	outputs := make(chan TestOutputs)
+	errs := make(chan error, workerCount)
+	for i := 0; i < workerCount; i++ {
+		cmd.workers.Add(1)
+		go cmd.worker(ctx, &cmd.workers, outputs, errs)
+	}
+	for _, entry := range manifest {
+		outputs <- entry
+	}
+	close(outputs)
+	go func() {
+		for err := range errs {
+			success = false
+			log.Println(err)
+		}
+	}()
+	cmd.workers.Wait()
+	close(errs)
+	return success
+}
+
+func (cmd *StoreTestOutputsCommand) worker(ctx context.Context, wg *sync.WaitGroup, outputs <-chan TestOutputs, errs chan<- error) {
+	defer wg.Done()
+	cli, err := artifacts.NewClient(ctx)
+	if err != nil {
+		errs <- err
+		return
+	}
+	dir := cli.GetBuildDir(cmd.bucket, cmd.build)
+	for output := range outputs {
+		if err := cmd.upload(context.Background(), output, dir); err != nil {
+			errs <- err
+		}
+	}
+}
+
+func (cmd *StoreTestOutputsCommand) upload(ctx context.Context, outputs TestOutputs, dir *artifacts.BuildDirectory) error {
+	fd, err := os.Open(outputs.OutputFile)
+	if err != nil {
+		return fmt.Errorf("failed to read %q: %v", outputs.OutputFile, err)
+	}
+	object := dir.NewTestOutputObject(ctx, outputs.Name, cmd.testEnv)
+	writer := object.NewWriter(ctx)
+	if _, err := io.Copy(writer, fd); err != nil {
+		return fmt.Errorf("failed to write %q: %v", object.ObjectName(), err)
+	}
+	if err := writer.Close(); err != nil {
+		return fmt.Errorf("failed to flush bufferfor %q: %v", object.ObjectName(), err)
+	}
+	return nil
+}