[host-target-testing] Support lazily fetching blobs

Some of the OTA test steps do not need all the blobs, like when we pave N-1
onto a device, then validate with sl4f. This introduces a BlobFetchMode,
where if we get a repository with LazilyFetchBlobs, we will only download
blobs when they're requested by the test device. This can half the artifacts
we need to download when getting the old version.

To go even further, we could be even smarter and compare the blobs from
N-1 and N, and only prefetch the blobs that are only in N, but we can address
that optimization down the road.

Change-Id: I4d28694a72fa0b3341cc9fc71e4ec84f1668dd0c
Reviewed-on: https://fuchsia-review.googlesource.com/c/fuchsia/+/512311
Commit-Queue: Erick Tryzelaar <etryzelaar@google.com>
Reviewed-by: John Wittrock <wittrock@google.com>
diff --git a/src/sys/pkg/tests/system-tests/reboot_test/reboot_test.go b/src/sys/pkg/tests/system-tests/reboot_test/reboot_test.go
index fe85e2d..bdf950e 100644
--- a/src/sys/pkg/tests/system-tests/reboot_test/reboot_test.go
+++ b/src/sys/pkg/tests/system-tests/reboot_test/reboot_test.go
@@ -143,7 +143,8 @@
 	build artifacts.Build,
 	rpcClient **sl4f.Client,
 ) error {
-	repo, err := build.GetPackageRepository(ctx)
+	// We don't install an OTA, so we don't need to prefetch the blobs.
+	repo, err := build.GetPackageRepository(ctx, artifacts.LazilyFetchBlobs)
 	if err != nil {
 		return fmt.Errorf("unable to get repository: %w", err)
 	}
@@ -211,7 +212,7 @@
 ) (*sl4f.Client, error) {
 	logger.Infof(ctx, "Initializing device")
 
-	repo, err := build.GetPackageRepository(ctx)
+	repo, err := build.GetPackageRepository(ctx, artifacts.LazilyFetchBlobs)
 	if err != nil {
 		return nil, err
 	}
diff --git a/src/sys/pkg/tests/system-tests/tracking_test/tracking_test.go b/src/sys/pkg/tests/system-tests/tracking_test/tracking_test.go
index 96ce4cb..b5c19c9 100644
--- a/src/sys/pkg/tests/system-tests/tracking_test/tracking_test.go
+++ b/src/sys/pkg/tests/system-tests/tracking_test/tracking_test.go
@@ -155,7 +155,7 @@
 		return fmt.Errorf("failed to find build %s: %w", buildID, err)
 	}
 
-	repo, err := build.GetPackageRepository(ctx)
+	repo, err := build.GetPackageRepository(ctx, artifacts.PrefetchBlobs)
 	if err != nil {
 		return fmt.Errorf("failed to get repo for build: %w", err)
 	}
@@ -209,7 +209,9 @@
 }
 
 func paveDevice(ctx context.Context, device *device.Client, downgradeBuild artifacts.Build) (*sl4f.Client, error) {
-	downgradeRepo, err := downgradeBuild.GetPackageRepository(ctx)
+	// We don't need to prefetch all the blobs, since we only use a subset of
+	// packages from the repository, like run, sl4f.
+	downgradeRepo, err := downgradeBuild.GetPackageRepository(ctx, artifacts.LazilyFetchBlobs)
 	if err != nil {
 		return nil, fmt.Errorf("error getting downgrade repository: %w", err)
 	}
diff --git a/src/sys/pkg/tests/system-tests/upgrade_test/upgrade_test.go b/src/sys/pkg/tests/system-tests/upgrade_test/upgrade_test.go
index e6b7b64..eb284db 100644
--- a/src/sys/pkg/tests/system-tests/upgrade_test/upgrade_test.go
+++ b/src/sys/pkg/tests/system-tests/upgrade_test/upgrade_test.go
@@ -167,7 +167,7 @@
 
 	startTime := time.Now()
 
-	repo, err := build.GetPackageRepository(ctx)
+	repo, err := build.GetPackageRepository(ctx, artifacts.PrefetchBlobs)
 	if err != nil {
 		return fmt.Errorf("error getting repository: %w", err)
 	}
@@ -223,7 +223,9 @@
 	var err error
 
 	if build != nil {
-		repo, err = build.GetPackageRepository(ctx)
+		// We don't need to prefetch all the blobs, since we only use a subset of
+		// packages from the repository, like run, sl4f.
+		repo, err = build.GetPackageRepository(ctx, artifacts.LazilyFetchBlobs)
 		if err != nil {
 			return nil, fmt.Errorf("error getting downgrade repository: %w", err)
 		}
diff --git a/src/testing/host-target-testing/artifacts/archive.go b/src/testing/host-target-testing/artifacts/archive.go
index b0d80db..8b64faa 100644
--- a/src/testing/host-target-testing/artifacts/archive.go
+++ b/src/testing/host-target-testing/artifacts/archive.go
@@ -66,11 +66,11 @@
 	}
 
 	return &ArtifactsBuild{
-		id:                 id,
-		archive:            a,
-		dir:                dir,
-		sshPublicKey:       publicKey,
-		srcs:               srcsMap,
+		id:           id,
+		archive:      a,
+		dir:          dir,
+		sshPublicKey: publicKey,
+		srcs:         srcsMap,
 	}, nil
 }
 
diff --git a/src/testing/host-target-testing/artifacts/build.go b/src/testing/host-target-testing/artifacts/build.go
index da85d94..f4135b4 100644
--- a/src/testing/host-target-testing/artifacts/build.go
+++ b/src/testing/host-target-testing/artifacts/build.go
@@ -23,12 +23,22 @@
 	"go.fuchsia.dev/fuchsia/tools/lib/logger"
 )
 
+type BlobFetchMode int
+
+const (
+	// PrefetchBlobs will download all the blobs from a build when `GetPackageRepository()` is called.
+	PrefetchBlobs BlobFetchMode = iota
+
+	// LazilyFetchBlobs will only download blobs when they are accessed.
+	LazilyFetchBlobs
+)
+
 type Build interface {
 	// GetBootserver returns the path to the bootserver used for paving.
 	GetBootserver(ctx context.Context) (string, error)
 
 	// GetPackageRepository returns a Repository for this build.
-	GetPackageRepository(ctx context.Context) (*packages.Repository, error)
+	GetPackageRepository(ctx context.Context, blobFetchMode BlobFetchMode) (*packages.Repository, error)
 
 	// GetPaverDir downloads and returns the directory containing the images
 	// and image manifest.
@@ -46,13 +56,13 @@
 
 // ArtifactsBuild represents the build artifacts for a specific build.
 type ArtifactsBuild struct {
-	id                 string
-	archive            *Archive
-	dir                string
-	packages           *packages.Repository
-	buildImageDir      string
-	sshPublicKey       ssh.PublicKey
-	srcs               map[string]struct{}
+	id            string
+	archive       *Archive
+	dir           string
+	packages      *packages.Repository
+	buildImageDir string
+	sshPublicKey  ssh.PublicKey
+	srcs          map[string]struct{}
 }
 
 func (b *ArtifactsBuild) GetBootserver(ctx context.Context) (string, error) {
@@ -71,7 +81,7 @@
 // GetPackageRepository returns a Repository for this build. It tries to
 // download a package when all the artifacts are stored in individual files,
 // which is how modern builds publish their build artifacts.
-func (b *ArtifactsBuild) GetPackageRepository(ctx context.Context) (*packages.Repository, error) {
+func (b *ArtifactsBuild) GetPackageRepository(ctx context.Context, fetchMode BlobFetchMode) (*packages.Repository, error) {
 	if b.packages != nil {
 		return b.packages, nil
 	}
@@ -113,21 +123,54 @@
 	for _, blob := range blobs {
 		blobsList = append(blobsList, filepath.Join("blobs", blob.Merkle))
 	}
-	logger.Infof(ctx, "all_blobs contains %d blobs", len(blobsList))
+	logger.Infof(ctx, "all_blobs contains %d blobs", len(blobs))
 
 	blobsDir := filepath.Join(b.dir, "blobs")
-	p, err := packages.NewRepository(ctx, packagesDir, blobsDir)
+
+	if fetchMode == PrefetchBlobs {
+		if err := b.archive.download(ctx, b.id, true, filepath.Dir(blobsDir), blobsList); err != nil {
+			logger.Errorf(ctx, "failed to download blobs to %s: %v", blobsDir, err)
+			return nil, fmt.Errorf("failed to download blobs to %s: %w", blobsDir, err)
+		}
+	}
+
+	p, err := packages.NewRepository(ctx, packagesDir, &proxyBlobStore{b, blobsDir})
 	if err != nil {
 		return nil, err
 	}
 	b.packages = p
 
-	if err := b.archive.download(ctx, b.id, true, filepath.Dir(blobsDir), blobsList); err != nil {
-		logger.Errorf(ctx, "failed to download blobs to %s: %v", blobsDir, err)
-		return nil, fmt.Errorf("failed to download blobs to %s: %w", blobsDir, err)
+	return b.packages, nil
+}
+
+type proxyBlobStore struct {
+	b   *ArtifactsBuild
+	dir string
+}
+
+func (fs *proxyBlobStore) OpenBlob(ctx context.Context, merkle string) (*os.File, error) {
+	path := filepath.Join(fs.dir, merkle)
+
+	// First, try to read the blob from the directory
+	if f, err := os.Open(path); err == nil {
+		return f, nil
 	}
 
-	return b.packages, nil
+	// Otherwise, start downloading the blob. The package resolver will only
+	// fetch a blob once, so we don't need to deduplicate requests on our side.
+
+	logger.Infof(ctx, "download blob from build %s: %s", fs.b.id, merkle)
+
+	src := filepath.Join("blobs", merkle)
+	if err := fs.b.archive.download(ctx, fs.b.id, true, path, []string{src}); err != nil {
+		return nil, err
+	}
+
+	return os.Open(path)
+}
+
+func (fs *proxyBlobStore) Dir() string {
+	return fs.dir
 }
 
 // GetBuildImages downloads the build images for a specific build id.
@@ -150,13 +193,13 @@
 	}
 
 	if len(imageSrcs) == 0 {
-    return "", fmt.Errorf("build %s has no images/ directory", b.id)
+		return "", fmt.Errorf("build %s has no images/ directory", b.id)
 	}
 
 	imageDir := filepath.Join(b.dir, b.id, "images")
 
 	if err := b.archive.download(ctx, b.id, false, filepath.Dir(imageDir), imageSrcs); err != nil {
-    return "", fmt.Errorf("failed to download images to %s: %w", imageDir, err)
+		return "", fmt.Errorf("failed to download images to %s: %w", imageDir, err)
 	}
 
 	b.buildImageDir = imageDir
@@ -259,8 +302,9 @@
 	return filepath.Join(b.dir, "host_x64/bootserver_new"), nil
 }
 
-func (b *FuchsiaDirBuild) GetPackageRepository(ctx context.Context) (*packages.Repository, error) {
-	return packages.NewRepository(ctx, filepath.Join(b.dir, "amber-files"), filepath.Join(b.dir, "amber-files", "repository", "blobs"))
+func (b *FuchsiaDirBuild) GetPackageRepository(ctx context.Context, blobFetchMode BlobFetchMode) (*packages.Repository, error) {
+	blobFS := packages.NewDirBlobStore(filepath.Join(b.dir, "amber-files", "repository", "blobs"))
+	return packages.NewRepository(ctx, filepath.Join(b.dir, "amber-files"), blobFS)
 }
 
 func (b *FuchsiaDirBuild) GetPaverDir(ctx context.Context) (string, error) {
@@ -321,8 +365,8 @@
 }
 
 // GetPackageRepository returns a Repository for this build.
-func (b *OmahaBuild) GetPackageRepository(ctx context.Context) (*packages.Repository, error) {
-	return b.build.GetPackageRepository(ctx)
+func (b *OmahaBuild) GetPackageRepository(ctx context.Context, blobFetchMode BlobFetchMode) (*packages.Repository, error) {
+	return b.build.GetPackageRepository(ctx, blobFetchMode)
 }
 
 func (b *OmahaBuild) GetPaverDir(ctx context.Context) (string, error) {
diff --git a/src/testing/host-target-testing/packages/package_builder.go b/src/testing/host-target-testing/packages/package_builder.go
index 45e8e4a..e897dfa 100644
--- a/src/testing/host-target-testing/packages/package_builder.go
+++ b/src/testing/host-target-testing/packages/package_builder.go
@@ -136,7 +136,7 @@
 	// Open repository
 	// Repository.Dir contains a trailing `repository` in the path that we don't want.
 	repoDir := path.Dir(pkgRepo.Dir)
-	pmRepo, err := repo.New(repoDir, pkgRepo.BlobsDir)
+	pmRepo, err := repo.New(repoDir, pkgRepo.BlobStore.Dir())
 	if err != nil {
 		return "", "", fmt.Errorf("failed to open repository at %q. %w", pkgRepo.Dir, err)
 	}
diff --git a/src/testing/host-target-testing/packages/package_builder_test.go b/src/testing/host-target-testing/packages/package_builder_test.go
index b4a195c..8232042 100644
--- a/src/testing/host-target-testing/packages/package_builder_test.go
+++ b/src/testing/host-target-testing/packages/package_builder_test.go
@@ -66,7 +66,7 @@
 	if err = pmRepo.CommitUpdates(true); err != nil {
 		t.Fatalf("failed to commit updates to repo: %s", err)
 	}
-	pkgRepo, err := NewRepository(ctx, dir, blobsDir)
+	pkgRepo, err := NewRepository(ctx, dir, NewDirBlobStore(blobsDir))
 	if err != nil {
 		t.Fatalf("failed to read repo: %s", err)
 	}
@@ -196,7 +196,7 @@
 		t.Fatalf("package path should be %q, not %q", fullPkgName, actualPkgName)
 	}
 
-	pkgRepo, err = NewRepository(ctx, path.Dir(pkgRepo.Dir), pkgRepo.BlobsDir)
+	pkgRepo, err = NewRepository(ctx, path.Dir(pkgRepo.Dir), pkgRepo.BlobStore)
 
 	// Confirm that the package is published and updated.
 	pkg, err = pkgRepo.OpenPackage(ctx, fullPkgName)
diff --git a/src/testing/host-target-testing/packages/repo.go b/src/testing/host-target-testing/packages/repo.go
index 26d1382..fab6fe7 100644
--- a/src/testing/host-target-testing/packages/repo.go
+++ b/src/testing/host-target-testing/packages/repo.go
@@ -16,10 +16,31 @@
 	"go.fuchsia.dev/fuchsia/tools/lib/logger"
 )
 
+type BlobStore interface {
+	Dir() string
+	OpenBlob(ctx context.Context, merkle string) (*os.File, error)
+}
+
+type DirBlobStore struct {
+	dir string
+}
+
+func NewDirBlobStore(dir string) BlobStore {
+	return &DirBlobStore{dir}
+}
+
+func (fs *DirBlobStore) OpenBlob(ctx context.Context, merkle string) (*os.File, error) {
+	return os.Open(filepath.Join(fs.dir, merkle))
+}
+
+func (fs *DirBlobStore) Dir() string {
+	return fs.dir
+}
+
 type Repository struct {
 	Dir string
 	// BlobsDir should be a directory called `blobs` where all the blobs are.
-	BlobsDir string
+	BlobStore BlobStore
 }
 
 type signed struct {
@@ -40,12 +61,12 @@
 
 // NewRepository parses the repository from the specified directory. It returns
 // an error if the repository does not exist, or it contains malformed metadata.
-func NewRepository(ctx context.Context, dir, blobsDir string) (*Repository, error) {
-	logger.Infof(ctx, "creating a repository for %q and %q", dir, blobsDir)
+func NewRepository(ctx context.Context, dir string, blobStore BlobStore) (*Repository, error) {
+	logger.Infof(ctx, "creating a repository for %q and %q", dir, blobStore.Dir())
 
 	// The repository may have out of date metadata. This updates the repository to
 	// the latest version so TUF won't complain about the data being old.
-	repo, err := repo.New(dir, blobsDir)
+	repo, err := repo.New(dir, blobStore.Dir())
 	if err != nil {
 		return nil, err
 	}
@@ -61,8 +82,8 @@
 	}
 
 	return &Repository{
-		Dir:      filepath.Join(dir, "repository"),
-		BlobsDir: blobsDir,
+		Dir:       filepath.Join(dir, "repository"),
+		BlobStore: blobStore,
 	}, nil
 }
 
@@ -74,7 +95,7 @@
 		return nil, fmt.Errorf("failed to extract packages: %w", err)
 	}
 
-	return NewRepository(ctx, filepath.Join(dst, "amber-files"), filepath.Join(dst, "amber-files", "repository", "blobs"))
+	return NewRepository(ctx, filepath.Join(dst, "amber-files"), NewDirBlobStore(filepath.Join(dst, "amber-files", "repository", "blobs")))
 }
 
 // OpenPackage opens a package from the repository.
@@ -100,11 +121,11 @@
 }
 
 func (r *Repository) OpenBlob(ctx context.Context, merkle string) (*os.File, error) {
-	return os.Open(filepath.Join(r.BlobsDir, merkle))
+	return r.BlobStore.OpenBlob(ctx, merkle)
 }
 
 func (r *Repository) Serve(ctx context.Context, localHostname string, repoName string) (*Server, error) {
-	return newServer(ctx, r.Dir, r.BlobsDir, localHostname, repoName)
+	return newServer(ctx, r.Dir, r.BlobStore, localHostname, repoName)
 }
 
 func (r *Repository) LookupUpdateSystemImageMerkle(ctx context.Context) (string, error) {
diff --git a/src/testing/host-target-testing/packages/server.go b/src/testing/host-target-testing/packages/server.go
index c40505e..74d1d29 100644
--- a/src/testing/host-target-testing/packages/server.go
+++ b/src/testing/host-target-testing/packages/server.go
@@ -10,6 +10,7 @@
 	"encoding/hex"
 	"encoding/json"
 	"fmt"
+	"io/fs"
 	"net"
 	"net/http"
 	"os"
@@ -25,14 +26,26 @@
 
 type Server struct {
 	Dir          string
-	BlobsDir     string
+	BlobStore    BlobStore
 	URL          string
 	Hash         string
 	server       *http.Server
 	shuttingDown chan struct{}
 }
 
-func newServer(ctx context.Context, dir, blobsDir string, localHostname string, repoName string) (*Server, error) {
+type httpBlobStore struct {
+	ctx       context.Context
+	blobStore BlobStore
+}
+
+func (f httpBlobStore) Open(path string) (fs.File, error) {
+	if !strings.HasPrefix(path, "blobs/") {
+		return nil, os.ErrNotExist
+	}
+	return f.blobStore.OpenBlob(f.ctx, strings.TrimPrefix(path, "blobs/"))
+}
+
+func newServer(ctx context.Context, dir string, blobStore BlobStore, localHostname string, repoName string) (*Server, error) {
 	listener, err := net.Listen("tcp", ":0")
 	if err != nil {
 		return nil, err
@@ -56,7 +69,7 @@
 	// Blobs requests come as `/blobs/<merkle>` so the directory we actually
 	// serve from should be the parent directory of the blobsDir and the blobsDir
 	// should be called `blobs`.
-	mux.Handle("/blobs/", http.FileServer(http.Dir(filepath.Dir(blobsDir))))
+	mux.Handle("/blobs/", http.FileServer(http.FS(httpBlobStore{ctx, blobStore})))
 	mux.Handle("/", http.FileServer(http.Dir(dir)))
 
 	server := &http.Server{
@@ -87,7 +100,7 @@
 
 	return &Server{
 		Dir:          dir,
-		BlobsDir:     blobsDir,
+		BlobStore:    blobStore,
 		URL:          configURL,
 		Hash:         configHash,
 		server:       server,