blob: e3d78d17e87a07b868231f3c6ed385b615c8e181 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package botanist
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"cloud.google.com/go/storage"
"go.fuchsia.dev/fuchsia/src/sys/pkg/bin/pm/pmhttp"
"go.fuchsia.dev/fuchsia/tools/lib/gcsutil"
"go.fuchsia.dev/fuchsia/tools/lib/logger"
)
const (
repoID = "fuchsia-pkg://fuchsia.com"
localhostPlaceholder = "localhost"
DefaultPkgSrvPort = 8083
)
var (
// getGCSReader allows us to stub out the GCS communiciation in the tests.
getGCSReader = getGCSReaderImpl
)
// downloadRecord contains a path we fetched from the remote and how much data
// we retrieved.
type downloadRecord struct {
// Path is the path requested by the package resolver.
Path string `json:"path"`
// Size is the size of the blob.
Size int `json:"size"`
}
func getGCSReaderImpl(ctx context.Context, client *storage.Client, bucket, path string) (io.ReadCloser, error) {
bkt := client.Bucket(bucket)
obj := bkt.Object(path)
return gcsutil.NewObjectReader(ctx, obj)
}
// cachedPkgRepo is a custom HTTP handler that acts as a GCS redirector with a
// local filesystem cache.
type cachedPkgRepo struct {
loggerCtx context.Context
downloadManifest []downloadRecord
gcsClient *storage.Client
repoPath string
repoURL *url.URL
blobURL *url.URL
}
func newCachedPkgRepo(ctx context.Context, repoPath, repoURL, blobURL string) (*cachedPkgRepo, error) {
client, err := storage.NewClient(ctx)
if err != nil {
return nil, err
}
rURL, err := url.Parse(repoURL)
if err != nil {
return nil, err
}
bURL, err := url.Parse(blobURL)
if err != nil {
return nil, err
}
return &cachedPkgRepo{
loggerCtx: ctx,
repoPath: repoPath,
gcsClient: client,
repoURL: rURL,
blobURL: bURL,
}, nil
}
func (c *cachedPkgRepo) logf(msg string, args ...interface{}) {
logger.Debugf(c.loggerCtx, fmt.Sprintf("[package server] %s", msg), args)
}
func (c *cachedPkgRepo) ServeHTTP(w http.ResponseWriter, r *http.Request) {
localPath := path.Join(c.repoPath, r.URL.Path)
if _, err := os.Stat(localPath); err != nil {
// If the requested path does not exist locally, fetch it from GCS.
if errors.Is(err, os.ErrNotExist) {
statusCode, err := c.fetchFromGCS(r.Context(), r.URL, localPath)
if err != nil {
c.logf("failed to fetch %s from GCS: %s", r.URL.Path, err)
w.WriteHeader(statusCode)
return
}
} else {
c.logf("failed to stat %s: %s", localPath, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
contents, err := ioutil.ReadFile(localPath)
if err != nil {
c.logf("failed to read file %s: %s", localPath, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
// The package resolver requires the Content-Length header.
w.Header().Set("Content-Length", strconv.Itoa(len(contents)))
w.WriteHeader(http.StatusOK)
w.Write(contents)
}
func (c *cachedPkgRepo) fetchFromGCS(ctx context.Context, resource *url.URL, localPath string) (int, error) {
var bucket string
if strings.HasPrefix(resource.Path, "/repository") {
bucket = c.repoURL.Host
} else if strings.HasPrefix(resource.Path, "/blobs") {
bucket = c.blobURL.Host
} else {
return http.StatusBadRequest, fmt.Errorf("unsupported path: %s", resource.Path)
}
resourcePath := strings.TrimLeft(resource.Path, "/")
r, err := getGCSReader(ctx, c.gcsClient, bucket, resourcePath)
if err != nil {
if errors.Is(err, storage.ErrObjectNotExist) {
return http.StatusNotFound, err
}
return http.StatusInternalServerError, err
}
defer r.Close()
w, err := os.Create(localPath)
if err != nil {
return http.StatusInternalServerError, err
}
defer w.Close()
size, err := io.Copy(w, r)
if err != nil {
return http.StatusInternalServerError, err
}
dr := downloadRecord{
Path: resourcePath,
Size: int(size),
}
c.downloadManifest = append(c.downloadManifest, dr)
return http.StatusOK, nil
}
func (c *cachedPkgRepo) writeDownloadManifest(path string) error {
b, err := json.Marshal(c.downloadManifest)
if err != nil {
return err
}
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()
_, err = f.Write(b)
return err
}
// PackageServer is the HTTP server that serves packages.
type PackageServer struct {
loggerCtx context.Context
srv *http.Server
c *cachedPkgRepo
downloadManifestPath string
RepoURL string
BlobURL string
}
func (p *PackageServer) Close() error {
logger.Debugf(p.loggerCtx, "stopping package server")
if p.downloadManifestPath != "" {
p.c.writeDownloadManifest(p.downloadManifestPath)
}
return p.srv.Close()
}
// NewPackageServer creates and starts a local package server.
func NewPackageServer(ctx context.Context, repoPath, remoteRepoURL, remoteBlobURL, downloadManifestPath string, port int) (*PackageServer, error) {
logger.Debugf(ctx, "creating package server serving from %s", repoPath)
// Create HTTP handlers for the package server.
rootJsonBytes, err := ioutil.ReadFile(filepath.Join(repoPath, "repository", "root.json"))
if err != nil {
return nil, err
}
cs := pmhttp.NewConfigServerV2(func() []byte {
return rootJsonBytes
}, false)
cPkgRepo, err := newCachedPkgRepo(ctx, repoPath, remoteRepoURL, remoteBlobURL)
if err != nil {
return nil, err
}
// Register the handlers and create the server.
mux := http.NewServeMux()
mux.Handle("/config.json", cs)
mux.Handle("/", cPkgRepo)
pkgSrv := &http.Server{
Handler: mux,
}
// Start the server and spin off a handler to stop it when the context
// is canceled.
pkgSrvStarted := make(chan struct{})
go func() {
addr := fmt.Sprintf("0.0.0.0:%d", port)
logger.Debugf(ctx, "starting package server on %s", addr)
l, err := net.Listen("tcp", addr)
if err != nil {
logger.Errorf(ctx, "listening on %s failed: %s", addr, err)
}
close(pkgSrvStarted)
if err := pkgSrv.Serve(l); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Errorf(ctx, "package server failed: %s", err)
}
}()
// Do not return until the package server has actually started serving.
<-pkgSrvStarted
logger.Debugf(ctx, "package server started")
return &PackageServer{
loggerCtx: ctx,
srv: pkgSrv,
c: cPkgRepo,
downloadManifestPath: downloadManifestPath,
RepoURL: fmt.Sprintf("http://%s:%d/repository", localhostPlaceholder, port),
BlobURL: fmt.Sprintf("http://%s:%d/blobs", localhostPlaceholder, port),
}, nil
}