blob: 3fe9f9a0a6ec8a89927b839522ccbdea6f610625 [file] [log] [blame]
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"encoding/csv"
"flag"
"fmt"
"io"
"log"
"os"
"strconv"
"strings"
"time"
texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
epb "github.com/cloudprober/cloudprober/probes/external/proto"
"github.com/cloudprober/cloudprober/probes/external/serverutils"
"go.opentelemetry.io/contrib/detectors/gcp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"golang.org/x/sync/errgroup"
// Install google-c2p resolver, which is required for direct path.
_ "google.golang.org/grpc/xds/googledirectpath"
"google.golang.org/protobuf/proto"
// Install RLS load balancer policy, which is needed for gRPC RLS.
_ "google.golang.org/grpc/balancer/rls"
)
const (
codeVersion = "0.11.0" // to keep track of which version of the code a benchmark ran on
useDefault = -1
tracerName = "storage-benchmark"
)
var (
projectID, outputFile string
opts = &benchmarkOptions{}
results chan benchmarkResult
serverMode bool
)
type benchmarkOptions struct {
// all sizes are in bytes
bucket string
region string
outType outputType
numSamples int
numWorkers int
api benchmarkAPI
objectSize int64
minObjectSize int64
maxObjectSize int64
rangeSize int64
minReadOffset int64
maxReadOffset int64
readBufferSize int
writeBufferSize int
minChunkSize int64
maxChunkSize int64
forceGC bool
connPoolSize int
timeout time.Duration
timeoutPerOp time.Duration
continueOnFail bool
numClients int
workload int
numObjectsPerDirectory int
useGCSFuseConfig bool
endpoint string
enableTracing bool
traceSampleRate float64
warmup time.Duration
}
func (b *benchmarkOptions) validate() error {
if err := b.api.validate(); err != nil {
return err
}
if err := b.outType.validate(); err != nil {
return err
}
if (b.maxReadOffset != 0 || b.minReadOffset != 0) && b.rangeSize == 0 {
return fmt.Errorf("read offset specified but no range size specified")
}
minObjSize := b.objectSize
if minObjSize == 0 {
minObjSize = b.minObjectSize
}
if b.maxReadOffset > minObjSize-b.rangeSize {
return fmt.Errorf("read offset (%d) is too large for the selected range size (%d) - object might run out of bytes before reading complete rangeSize", b.maxReadOffset, b.rangeSize)
}
return nil
}
func (b *benchmarkOptions) String() string {
var sb strings.Builder
stringifiedOpts := []string{
fmt.Sprintf("api:\t\t\t%s", b.api),
fmt.Sprintf("region:\t\t\t%s", b.region),
fmt.Sprintf("timeout:\t\t%s", b.timeout),
fmt.Sprintf("number of samples:\t%d", b.numSamples),
fmt.Sprintf("object size:\t\t%d kib", b.objectSize/kib),
fmt.Sprintf("object size (if none above):\t%d - %d kib", b.minObjectSize/kib, b.maxObjectSize/kib),
fmt.Sprintf("write size:\t\t%d bytes (app buffer for uploads)", b.writeBufferSize),
fmt.Sprintf("read size:\t\t%d bytes (app buffer for downloads)", b.readBufferSize),
fmt.Sprintf("chunk size:\t\t%d - %d kib (library buffer for uploads)", b.minChunkSize/kib, b.maxChunkSize/kib),
fmt.Sprintf("range offset:\t\t%d - %d bytes ", b.minReadOffset, b.maxReadOffset),
fmt.Sprintf("range size:\t\t%d bytes (0 -> full object)", b.rangeSize),
fmt.Sprintf("connection pool size:\t%d (GRPC)", b.connPoolSize),
fmt.Sprintf("num workers:\t\t%d (max number of concurrent benchmark runs at a time)", b.numWorkers),
fmt.Sprintf("force garbage collection:%t", b.forceGC),
}
for _, s := range stringifiedOpts {
sb.WriteByte('\n')
sb.WriteByte('\t')
sb.WriteString(s)
}
return sb.String()
}
func parseFlags() {
flag.StringVar(&projectID, "project", projectID, "GCP project identifier")
flag.StringVar(&opts.bucket, "bucket", "", "name of bucket to use; will create a bucket if not provided")
flag.StringVar(&opts.region, "bucket_region", "US-WEST1", "region")
flag.StringVar((*string)(&opts.outType), "output_type", string(outputCloudMonitoring), "output as csv or cloud monitoring format")
flag.IntVar(&opts.numSamples, "samples", 8000, "number of samples to report")
flag.IntVar(&opts.numWorkers, "workers", 16, "number of concurrent workers")
flag.StringVar((*string)(&opts.api), "api", string(mixedAPIs), "api used to upload/download objects; JSON or XML values will use JSON to uplaod and XML to download")
objectRange := flag.String("object_size", fmt.Sprint(1024*kib), "object size in bytes")
flag.Int64Var(&opts.rangeSize, "range_read_size", 0, "size of the range to read in bytes")
flag.Int64Var(&opts.minReadOffset, "minimum_read_offset", 0, "minimum read offset in bytes")
flag.Int64Var(&opts.maxReadOffset, "maximum_read_offset", 0, "maximum read offset in bytes")
flag.BoolVar(&opts.useGCSFuseConfig, "gcs_fuse", false, "use GCSFuse configs on HTTP client creation")
flag.StringVar(&opts.endpoint, "endpoint", "", "endpoint to set on Storage Client")
flag.BoolVar(&opts.enableTracing, "tracing", false, "enable trace exporter to Cloud Trace")
flag.Float64Var(&opts.traceSampleRate, "sample_rate", 1.0, "sample rate for traces")
flag.IntVar(&opts.readBufferSize, "read_buffer_size", useDefault, "read buffer size in bytes")
flag.IntVar(&opts.writeBufferSize, "write_buffer_size", useDefault, "write buffer size in bytes")
flag.Int64Var(&opts.minChunkSize, "min_chunksize", useDefault, "min chunksize in bytes")
flag.Int64Var(&opts.maxChunkSize, "max_chunksize", useDefault, "max chunksize in bytes")
flag.IntVar(&opts.connPoolSize, "connection_pool_size", 4, "GRPC connection pool size")
flag.BoolVar(&opts.forceGC, "force_garbage_collection", false, "force garbage collection at the beginning of each upload")
flag.DurationVar(&opts.timeout, "timeout", time.Hour, "timeout")
flag.DurationVar(&opts.timeoutPerOp, "timeout_per_op", time.Minute*5, "timeout per upload/download")
flag.StringVar(&outputFile, "o", "", "file to output results to - if empty, will output to stdout")
flag.BoolVar(&opts.continueOnFail, "continue_on_fail", false, "continue even if a run fails")
flag.IntVar(&opts.numClients, "clients", 1, "number of storage clients to be used; if Mixed APIs, then twice the clients are created")
flag.IntVar(&opts.workload, "workload", 1, "which workload to run")
flag.IntVar(&opts.numObjectsPerDirectory, "directory_num_objects", 1000, "total number of objects in directory")
flag.DurationVar(&opts.warmup, "warmup", 0, "time to warmup benchmarks; w1r3 benchmarks will be run for this duration without recording any results")
flag.BoolVar(&serverMode, "server", false, "if true, script runs in cloudprober server mode")
flag.Parse()
if len(projectID) < 1 {
log.Fatalln("Must set a project ID. Use flag -project to specify it.")
}
min, max, isRange := strings.Cut(*objectRange, "..")
var err error
if isRange {
opts.minObjectSize, err = strconv.ParseInt(min, 10, 64)
if err != nil {
log.Fatalln("Could not parse object size")
}
opts.maxObjectSize, err = strconv.ParseInt(max, 10, 64)
if err != nil {
log.Fatalln("Could not parse object size")
}
} else {
opts.objectSize, err = strconv.ParseInt(min, 10, 64)
if err != nil {
log.Fatalln("Could not parse object size")
}
}
}
func main() {
log.SetOutput(os.Stderr)
parseFlags()
start := time.Now()
ctx, cancel := context.WithDeadline(context.Background(), start.Add(opts.timeout))
defer cancel()
// Print a message once deadline is exceeded
go func() {
<-ctx.Done()
log.Printf("total configured timeout exceeded")
}()
// Create bucket if necessary
if len(opts.bucket) < 1 {
opts.bucket = randomName(bucketPrefix)
cleanUp := createBenchmarkBucket(opts.bucket, opts)
defer cleanUp()
}
w := os.Stdout
// Create output file if necessary
if outputFile != "" {
f, err := os.Create(outputFile)
if err != nil {
log.Fatalf("Failed to create file %s: %v", outputFile, err)
}
defer f.Close()
w = f
}
// Enable direct path
if opts.api == directPath {
if err := os.Setenv("GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS", "true"); err != nil {
log.Fatalf("error setting direct path env var: %v", err)
}
}
if err := opts.validate(); err != nil {
log.Fatal(err)
}
closePools := initializeClientPools(ctx, opts)
defer closePools()
if err := populateDependencyVersions(); err != nil {
log.Fatalf("populateDependencyVersions: %v", err)
}
if err := warmupW1R3(ctx, opts); err != nil {
log.Fatal(err)
}
if opts.enableTracing {
cleanup := enableTracing(ctx, opts.traceSampleRate)
defer cleanup()
}
if serverMode {
// Server mode blocks forever servicing probe requests
runInServerMode(ctx, opts)
log.Fatalln("server mode exited unexpectedly")
}
err := runSamples(ctx, opts, w)
if outputFile != "" {
// if sending output to a file, we can use stdout for informational logs
fmt.Printf("\nTotal time running: %s\n", time.Since(start).Round(time.Second))
}
if err != nil {
log.Fatalln(err)
}
}
type benchmark interface {
setup(context.Context) error
run(context.Context) error
cleanup() error
}
type randomizedParams struct {
appBufferSize int
chunkSize int64
crc32cEnabled bool
md5Enabled bool
api benchmarkAPI
rangeOffset int64
}
type benchmarkAPI string
const (
jsonAPI benchmarkAPI = "JSON"
xmlAPI benchmarkAPI = "XML"
grpcAPI benchmarkAPI = "GRPC"
mixedAPIs benchmarkAPI = "Mixed"
directPath benchmarkAPI = "DirectPath"
)
func (api benchmarkAPI) validate() error {
switch api {
case jsonAPI, grpcAPI, xmlAPI, directPath, mixedAPIs:
return nil
default:
return fmt.Errorf("no such api: %s", api)
}
}
func writeHeader(w io.Writer) {
header := selectHeader()
cw := csv.NewWriter(w)
if err := cw.Write(*header); err != nil {
log.Fatalf("error writing csv header: %v", err)
}
cw.Flush()
}
func writeResultAsCSV(w io.Writer, result *benchmarkResult) {
cw := csv.NewWriter(w)
if err := cw.Write(result.csv()); err != nil {
log.Fatalf("error writing csv: %v", err)
}
cw.Flush()
}
func writeResultAsCloudMonitoring(w io.Writer, result *benchmarkResult) {
_, err := w.Write(result.cloudMonitoring())
if err != nil {
log.Fatalf("cloud monitoring w.Write: %v", err)
}
_, err = w.Write([]byte{'\n'})
if err != nil {
log.Fatalf("cloud monitoring w.Write: %v", err)
}
}
// enableTracing turns on Open Telemetry tracing with export to Cloud Trace.
func enableTracing(ctx context.Context, sampleRate float64) func() {
exporter, err := texporter.New(texporter.WithProjectID(projectID))
if err != nil {
log.Fatalf("texporter.New: %v", err)
}
// Identify your application using resource detection
res, err := resource.New(ctx,
// Use the GCP resource detector to detect information about the GCP platform
resource.WithDetectors(gcp.NewDetector()),
// Keep the default detectors
resource.WithTelemetrySDK(),
// Add your own custom attributes to identify your application
resource.WithAttributes(
semconv.ServiceName(tracerName),
),
)
if err != nil {
log.Fatalf("resource.New: %v", err)
}
// Create trace provider with the exporter.
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(res),
sdktrace.WithSampler(sdktrace.TraceIDRatioBased(sampleRate)),
)
otel.SetTracerProvider(tp)
return func() {
tp.ForceFlush(ctx)
if err := tp.Shutdown(context.Background()); err != nil {
log.Fatal(err)
}
}
}
func startRecordingResults(w io.Writer, g *errgroup.Group, oType outputType) {
// buffer channel so we don't block on printing results
results = make(chan benchmarkResult, 100)
if oType == outputCSV {
writeHeader(w)
}
// start recording results
g.Go(func() error {
for {
result, ok := <-results
if !ok {
break
}
if oType == outputCSV {
writeResultAsCSV(w, &result)
} else if oType == outputCloudMonitoring {
writeResultAsCloudMonitoring(w, &result)
}
}
return nil
})
}
type multiError []error
func (e multiError) Error() string {
var sb strings.Builder
for _, err := range e {
sb.WriteString(err.Error())
sb.WriteRune('\n')
}
return sb.String()
}
// runSamples will run benchmarks until the required number of samples is
// collected and recorded, there is a setup failure, or we hit the deadline
func runSamples(ctx context.Context, opts *benchmarkOptions, out io.Writer) error {
multiErr := multiError{}
deadline, _ := ctx.Deadline()
// Record results async
recordResultGroup, _ := errgroup.WithContext(ctx)
startRecordingResults(out, recordResultGroup, opts.outType)
// Create a group to run benchmarks
benchGroup, ctx := errgroup.WithContext(ctx)
// Select concurrency
var concurrentBenchmarkRuns int
switch opts.workload {
default:
concurrentBenchmarkRuns = opts.numWorkers
case 6, 9:
// Directory benchmarks parallelize on the object level, so only run one
// benchmark at a time
concurrentBenchmarkRuns = 1
}
benchGroup.SetLimit(concurrentBenchmarkRuns)
// Run benchmarks until the required number of samples is collected, or we
// hit the deadline
for i := 0; i < opts.numSamples && !time.Now().After(deadline); i++ {
benchGroup.Go(func() error {
var benchmark benchmark
switch opts.workload {
default:
benchmark = &w1r3{opts: opts, bucketName: opts.bucket}
case 6:
benchmark = &directoryBenchmark{opts: opts, bucketName: opts.bucket, numWorkers: opts.numWorkers}
case 9:
benchmark = &continuousReads{opts: opts, bucketName: opts.bucket, numWorkers: opts.numWorkers}
}
if err := benchmark.setup(ctx); err != nil {
// If setup fails, it will probably continue failing.
// Returning the error here will cancel the context, which will
// stop the benchmarking.
return fmt.Errorf("run setup failed: %v", err)
}
if err := benchmark.run(ctx); err != nil {
// If a run fails, we continue, as it could be a temporary issue.
multiErr = append(multiErr, fmt.Errorf("run failed: %v", err))
}
if err := benchmark.cleanup(); err != nil {
// If cleanup fails, we continue, as a failure here is not critical.
// Cleanup may be expected to fail if there is an issue with the run.
multiErr = append(multiErr, fmt.Errorf("run cleanup failed: %v", err))
}
return nil
})
}
if err := benchGroup.Wait(); err != nil {
multiErr = append(multiErr, err)
}
close(results)
recordResultGroup.Wait()
if len(multiErr) > 0 {
return multiErr
}
return nil
}
// runInServerMode blocks forever servicing probe requests.
// Number of samples requested is ignored in this mode.
// Timeouts must be properly set at the probe level to ensure requests aren't
// being serviced concurrently
func runInServerMode(ctx context.Context, opts *benchmarkOptions) {
// serverutils.Serve processes one request at a time sequentially; so we
// always output the results of a sample before beginning another.
// Therefore, this channel contains a maximum of 4 results (for w1r3) at once.
results = make(chan benchmarkResult, 4)
serverutils.Serve(func(request *epb.ProbeRequest, reply *epb.ProbeReply) {
// Set the ctx so that we stop processing this sample if we reach the
// timeout set in the prober's configuration.
timeLimit := time.Millisecond * time.Duration(request.GetTimeLimit())
ctx, cancel := context.WithTimeout(ctx, timeLimit)
defer cancel()
var benchmark benchmark
benchmark = &w1r3{opts: opts, bucketName: opts.bucket}
if opts.workload == 6 {
benchmark = &directoryBenchmark{opts: opts, bucketName: opts.bucket, numWorkers: opts.numWorkers}
}
if err := benchmark.setup(ctx); err != nil {
reply.ErrorMessage = proto.String(err.Error())
return
}
if err := benchmark.run(ctx); err != nil {
reply.ErrorMessage = proto.String(fmt.Errorf("run failed: %v", err).Error())
benchmark.cleanup()
return
}
if err := benchmark.cleanup(); err != nil {
reply.ErrorMessage = proto.String(fmt.Errorf("run cleanup failed: %v", err).Error())
return
}
numResults := 4 // w1r3 will output 1 result per read/write (1 write, 3 reads)
if opts.workload == 6 {
numResults = 2 // workload 6 only outputs 2 results (1 directory read, 1 write)
}
// Synchronously gather results
payload := new(strings.Builder)
for i := 0; i < numResults; i++ {
result := <-results
writeResultAsCloudMonitoring(payload, &result)
}
reply.Payload = proto.String(payload.String())
})
}
type outputType string
const (
outputCSV outputType = "csv"
outputCloudMonitoring outputType = "cloud-monitoring"
)
func (o outputType) validate() error {
switch o {
case outputCSV, outputCloudMonitoring:
return nil
default:
return fmt.Errorf("could not parse output type: %s", o)
}
}