blob: 25c12a80b3e59b4d94f6025e78994ac81f5c79f4 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package botanist
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"
"cloud.google.com/go/storage"
"go.fuchsia.dev/fuchsia/src/sys/pkg/bin/pm/pmhttp"
"go.fuchsia.dev/fuchsia/tools/botanist/constants"
"go.fuchsia.dev/fuchsia/tools/lib/gcsutil"
"go.fuchsia.dev/fuchsia/tools/lib/logger"
)
const (
repoID = "fuchsia-pkg://fuchsia.com"
localhostPlaceholder = "localhost"
DefaultPkgSrvPort = 8083
)
// getGCSReader allows us to stub out the GCS communiciation in the tests.
var getGCSReader = getGCSReaderImpl
// downloadRecord contains a path we fetched from the remote and how much data
// we retrieved.
type downloadRecord struct {
// Path is the path requested by the package resolver.
Path string `json:"path"`
// Size is the size of the blob.
Size int `json:"size"`
}
func getGCSReaderImpl(ctx context.Context, client *storage.Client, bucket, path string) (io.ReadCloser, int, error) {
bkt := client.Bucket(bucket)
obj := bkt.Object(path)
attrs, err := obj.Attrs(ctx)
if err != nil {
return nil, 0, err
}
r, err := gcsutil.NewObjectReader(ctx, obj)
if err != nil {
return nil, 0, err
}
return r, int(attrs.Size), nil
}
// cachedPkgRepo is a custom HTTP handler that acts as a GCS redirector with a
// local filesystem cache.
type cachedPkgRepo struct {
loggerCtx context.Context
downloadManifest []downloadRecord
gcsClient *storage.Client
fileServer http.Handler
repoPath string
repoURL *url.URL
blobURL *url.URL
totalBytesServed int
serveTimeSec float64
totalBytesFetched int
gcsFetchTimeSec float64
totalRequestsServed int
}
func newCachedPkgRepo(ctx context.Context, repoPath, repoURL, blobURL string) (*cachedPkgRepo, error) {
client, err := storage.NewClient(ctx)
if err != nil {
return nil, err
}
rURL, err := url.Parse(repoURL)
if err != nil {
return nil, err
}
bURL, err := url.Parse(blobURL)
if err != nil {
return nil, err
}
return &cachedPkgRepo{
loggerCtx: ctx,
fileServer: http.FileServer(http.Dir(repoPath)),
repoPath: repoPath,
gcsClient: client,
repoURL: rURL,
blobURL: bURL,
}, nil
}
func (c *cachedPkgRepo) logf(msg string, args ...interface{}) {
logger.Debugf(c.loggerCtx, fmt.Sprintf("[package server] %s", msg), args...)
}
func (c *cachedPkgRepo) ServeHTTP(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
localPath := path.Join(c.repoPath, r.URL.Path)
// If the requested path does not exist locally, fetch it from GCS.
// Otherwise, serve the blob from the cache.
if _, err := os.Stat(localPath); err != nil {
if !errors.Is(err, os.ErrNotExist) {
c.logf("failed to stat %s: %s", localPath, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
c.fetchFromGCS(w, r, localPath)
} else {
c.fileServer.ServeHTTP(w, r)
}
// Update the serving metrics.
if length := w.Header().Get("Content-Length"); length != "" {
l, err := strconv.Atoi(length)
if err != nil {
c.logf("failed to convert content length %s to integer: %s", length, err)
} else {
c.totalBytesServed += l
c.totalRequestsServed += 1
}
}
c.serveTimeSec += time.Since(startTime).Seconds()
}
func (c *cachedPkgRepo) fetchFromGCS(w http.ResponseWriter, r *http.Request, localPath string) {
startTime := time.Now()
// Parse the GCS bucket from the URL.
var bucket string
if strings.HasPrefix(r.URL.Path, "/repository") {
bucket = c.repoURL.Host
} else if strings.HasPrefix(r.URL.Path, "/blobs") {
bucket = c.blobURL.Host
} else {
w.WriteHeader(http.StatusBadRequest)
return
}
// Retrieve a GCS reader from the bucket.
resourcePath := strings.TrimLeft(r.URL.Path, "/")
c.logf("Downloading gs://%s/%s", bucket, resourcePath)
gcsRdr, size, err := getGCSReader(r.Context(), c.gcsClient, bucket, resourcePath)
if err != nil {
if errors.Is(err, storage.ErrObjectNotExist) {
w.WriteHeader(http.StatusNotFound)
} else {
c.logf("failed to get GCS reader: %s", err)
w.WriteHeader(http.StatusInternalServerError)
}
return
}
defer gcsRdr.Close()
// Create a tempfile to store the blob contents. We will insert this
// into the blob cache if the download succeeds.
if err := os.MkdirAll(filepath.Dir(localPath), os.ModePerm); err != nil {
c.logf("failed to create parent dir for blob %s: %s", localPath, err)
return
}
tf, err := os.CreateTemp(filepath.Dir(localPath), filepath.Base(localPath))
if err != nil {
c.logf("failed to create tempfile to hold blob %s: %s", localPath, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
cleanup := true
defer func() {
if cleanup {
os.Remove(tf.Name())
}
}()
// Copy the blob from the remote to both the client and the file.
w.Header().Set("Content-Length", strconv.Itoa(size))
mw := io.MultiWriter(w, tf)
if _, err := io.Copy(mw, gcsRdr); err != nil {
c.logf("failed to copy blob %s: %s", localPath, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
// At this point, we know that the download succeeded, so move the
// temporary file holding the blob into the cache.
tf.Close()
if err := os.Rename(tf.Name(), localPath); err != nil {
// This is not a fatal error, as future requests will just re-download
// the blob.
c.logf("failed to move downloaded blob %s to cache: %s", localPath, err)
} else {
// The blob has been renamed, and thus we no longer need to delete the
// temp file.
cleanup = false
}
// Update metrics and download records.
dr := downloadRecord{
Path: resourcePath,
Size: size,
}
c.downloadManifest = append(c.downloadManifest, dr)
c.gcsFetchTimeSec += time.Since(startTime).Seconds()
c.totalBytesFetched += size
}
func (c *cachedPkgRepo) writeDownloadManifest(path string) error {
b, err := json.Marshal(c.downloadManifest)
if err != nil {
return err
}
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()
_, err = f.Write(b)
return err
}
// PackageServer is the HTTP server that serves packages.
type PackageServer struct {
loggerCtx context.Context
srv *http.Server
c *cachedPkgRepo
downloadManifestPath string
RepoURL string
BlobURL string
}
func (p *PackageServer) Close() error {
logger.Debugf(p.loggerCtx, "stopping package server")
if p.downloadManifestPath != "" {
p.c.writeDownloadManifest(p.downloadManifestPath)
}
// Calculate the bandwidth of package serving when the package server has
// to fetch blobs from GCS.
gcsFetchBandwidth := float64(p.c.totalBytesFetched) / p.c.gcsFetchTimeSec
// Calculate the bandwidth of package serving when the package server is
// serving blobs from disk.
localServingBandwitdh := float64(p.c.totalBytesServed-p.c.totalBytesFetched) / (p.c.serveTimeSec - p.c.gcsFetchTimeSec)
logger.Debugf(p.loggerCtx, "----------------------------------------------------")
logger.Debugf(p.loggerCtx, "Package server data")
logger.Debugf(p.loggerCtx, "----------------------------------------------------")
logger.Debugf(p.loggerCtx, "Total requests served: %d", p.c.totalRequestsServed)
logger.Debugf(p.loggerCtx, "Total data served (bytes): %d", p.c.totalBytesServed)
logger.Debugf(p.loggerCtx, "Total time spent serving (seconds): %f", p.c.serveTimeSec)
logger.Debugf(p.loggerCtx, "Bandwidth using local package serving (bytes per second): %f", localServingBandwitdh)
logger.Debugf(p.loggerCtx, "Total data fetched from GCS (bytes): %d", p.c.totalBytesFetched)
logger.Debugf(p.loggerCtx, "Total time spent downloading from GCS (seconds): %f", p.c.gcsFetchTimeSec)
logger.Debugf(p.loggerCtx, "Bandwidth using GCS downloads (bytes per second): %f", gcsFetchBandwidth)
logger.Debugf(p.loggerCtx, "----------------------------------------------------")
return p.srv.Close()
}
// NewPackageServer creates and starts a local package server.
func NewPackageServer(ctx context.Context, repoPath, remoteRepoURL, remoteBlobURL, downloadManifestPath string, port int) (*PackageServer, error) {
logger.Debugf(ctx, "creating package server serving from %s", repoPath)
// Create HTTP handlers for the package server.
rootJsonBytes, err := os.ReadFile(filepath.Join(repoPath, "repository", "root.json"))
if err != nil {
return nil, err
}
cs := pmhttp.NewConfigServerV2(func() []byte {
return rootJsonBytes
}, false)
cPkgRepo, err := newCachedPkgRepo(ctx, repoPath, remoteRepoURL, remoteBlobURL)
if err != nil {
return nil, err
}
// Register the handlers and create the server.
mux := http.NewServeMux()
mux.Handle("/config.json", cs)
mux.Handle("/", cPkgRepo)
pkgSrv := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
lw := &loggingWriter{w, 0, 0}
mux.ServeHTTP(lw, r)
logger.Debugf(ctx, "Served %s \"%s %s %s\" %d %d",
r.RemoteAddr,
r.Method,
r.RequestURI,
r.Proto,
lw.Status,
lw.ResponseSize)
}),
}
// Start the server and spin off a handler to stop it when the context
// is canceled.
pkgSrvStarted := make(chan struct{})
go func() {
addr := fmt.Sprintf("0.0.0.0:%d", port)
logger.Debugf(ctx, "[package server] starting on %s", addr)
l, err := net.Listen("tcp", addr)
if err != nil {
logger.Errorf(ctx, "%s: listening on %s failed: %s", constants.FailedToServeMsg, addr, err)
}
close(pkgSrvStarted)
if err := pkgSrv.Serve(l); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Errorf(ctx, "%s: %s", constants.FailedToServeMsg, err)
}
}()
// Do not return until the package server has actually started serving.
<-pkgSrvStarted
logger.Debugf(ctx, "package server started")
return &PackageServer{
loggerCtx: ctx,
srv: pkgSrv,
c: cPkgRepo,
downloadManifestPath: downloadManifestPath,
RepoURL: fmt.Sprintf("http://%s:%d/repository", localhostPlaceholder, port),
BlobURL: fmt.Sprintf("http://%s:%d/blobs", localhostPlaceholder, port),
}, nil
}
type loggingWriter struct {
http.ResponseWriter
Status int
ResponseSize int64
}
func (lw *loggingWriter) WriteHeader(status int) {
lw.Status = status
lw.ResponseWriter.WriteHeader(status)
}
func (lw *loggingWriter) Write(b []byte) (int, error) {
n, err := lw.ResponseWriter.Write(b)
lw.ResponseSize += int64(n)
return n, err
}