blob: 0026a21434675f72577f54357bf07295834d0693 [file] [log] [blame]
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"fmt"
"math/rand"
"os"
"path"
"runtime"
"time"
"cloud.google.com/go/storage"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// w1r3 or "write one, read three" is a benchmark that uploads a randomly generated
// object once and then downloads in three times. The object is downloaded more
// than once to compare "cold" (just uploaded) vs. "hot" data.
//
// SET UP
// - Initialize structs to populate with the benchmark results.
// - Select a random API to use to upload and download the object, unless an
// API is set in the command-line,
// - Select a random size for the object that will be uploaded; this size will
// be between two values configured in the command-line.
// - Create an object of that size on disk, and fill with random contents.
// - Select, for the upload and each download separately, the following parameters:
// - the application buffer size set in the command-line
// - the chunksize (only for uploads) set in the command-line,
// - if the client library will perform CRC32C and/or MD5 hashes on the data.
//
// BENCHMARK
// - Grab a storage client from the pool.
// - Take a snapshot of the current memory stats.
// - Upload the object that was created in the set up, capturing the time taken.
// This includes opening the file, writing the object, and verifying the hash
// (if applicable).
// - Take another snapshot of memory stats.
// - Downloads the same object (3 times) sequentially with the same client,
// capturing the elapsed time and taking memory snapshots before and after
// each download.
type w1r3 struct {
opts *benchmarkOptions
bucketName, objectName string
directoryPath string
objectPath string
writeResult *benchmarkResult
readResults []*benchmarkResult
isWarmup bool // if true, results should not be recorded
}
func (r *w1r3) setup(ctx context.Context) error {
// Check for context cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Select API first as it will be the same for all writes/reads
api := opts.api
if api == mixedAPIs {
switch rand.Intn(4) {
case 0:
api = xmlAPI
case 1:
api = jsonAPI
case 2:
api = grpcAPI
case 3:
api = directPath
}
}
// Select object size
objectSize := opts.objectSize
if objectSize == 0 {
objectSize = randomInt64(opts.minObjectSize, opts.maxObjectSize)
}
r.writeResult = &benchmarkResult{objectSize: objectSize}
r.readResults = []*benchmarkResult{{objectSize: objectSize}, {objectSize: objectSize}, {objectSize: objectSize}}
// Select write params
r.writeResult.selectWriteParams(*r.opts, api)
// Select read params
firstRead := r.readResults[0]
firstRead.isRead = true
firstRead.readIteration = 0
firstRead.selectReadParams(*r.opts, api)
// We want the reads to be similar, so we copy the params from the first read
for i, res := range r.readResults[1:] {
res.isRead = true
res.readIteration = i + 1
res.copyParams(firstRead)
}
// Make a temp dir for this run
dir, err := os.MkdirTemp("", "benchmark-experiment-")
if err != nil {
return err
}
// Create contents
objectPath, err := generateRandomFile(dir, objectSize)
if err != nil {
return fmt.Errorf("generateRandomFile: %w", err)
}
r.directoryPath = dir
r.objectPath = objectPath
r.objectName = path.Base(objectPath)
return nil
}
// cleanup deletes objects on disk and in GCS. It does not accept a context as
// it should run to completion to ensure full clean up of resources.
func (r *w1r3) cleanup() error {
// Clean temp dir
if err := os.RemoveAll(r.directoryPath); err != nil {
return err
}
// Delete uploaded object
c := nonBenchmarkingClients.Get()
o := c.Bucket(r.bucketName).Object(r.objectName).Retryer(storage.WithPolicy(storage.RetryAlways))
if err := o.Delete(context.Background()); err != nil {
return err
}
return nil
}
func (r *w1r3) run(ctx context.Context) error {
// Use the same client for write and reads as the api is the same
client := getClient(ctx, r.readResults[0].params.api)
var span trace.Span
ctx, span = otel.GetTracerProvider().Tracer(tracerName).Start(ctx, "w1r3")
span.SetAttributes(attribute.KeyValue{Key: "workload", Value: attribute.StringValue("w1r3")},
attribute.KeyValue{Key: "api", Value: attribute.StringValue(string(r.opts.api))},
attribute.KeyValue{Key: "object_size", Value: attribute.Int64Value(r.opts.objectSize)})
defer span.End()
// Upload
err := runOneSample(r.writeResult, func() (time.Duration, error) {
return uploadBenchmark(ctx, uploadOpts{
client: client,
params: r.writeResult.params,
bucket: r.bucketName,
object: r.objectName,
useDefaultChunkSize: opts.minChunkSize == useDefault || opts.maxChunkSize == useDefault,
objectPath: r.objectPath,
timeout: r.opts.timeoutPerOp,
})
}, r.isWarmup)
// Do not attempt to read from a failed upload
if err != nil {
return fmt.Errorf("upload: %w", err)
}
// Downloads
for i := 0; i < 3; i++ {
// Get full object if no rangeSize is specified
rangeStart := int64(0)
rangeLength := int64(-1)
if opts.rangeSize > 0 {
rangeStart = r.readResults[i].params.rangeOffset
rangeLength = opts.rangeSize
}
r.readResults[i].readOffset = rangeStart
err = runOneSample(r.readResults[i], func() (time.Duration, error) {
return downloadBenchmark(ctx, downloadOpts{
client: client,
objectSize: r.readResults[i].objectSize,
bucket: r.bucketName,
object: r.objectName,
rangeStart: rangeStart,
rangeLength: rangeLength,
downloadToDirectory: r.directoryPath,
timeout: r.opts.timeoutPerOp,
})
}, r.isWarmup)
if err != nil {
// We stop additional reads if one fails, as the iteration number would be off
return fmt.Errorf("download[%d]: %v", i, err)
}
}
return nil
}
func runOneSample(result *benchmarkResult, doOp func() (time.Duration, error), isWarmup bool) error {
var memStats *runtime.MemStats = &runtime.MemStats{}
// If the option is specified, run the garbage collector before collecting
// memory statistics and starting the timer on the benchmark. This can be
// used to compare between running each benchmark "on a blank slate" vs organically.
if opts.forceGC {
runtime.GC()
}
runtime.ReadMemStats(memStats)
result.startMem = *memStats
result.start = time.Now()
timeTaken, err := doOp()
runtime.ReadMemStats(memStats)
result.endMem = *memStats
result.err = err
result.elapsedTime = timeTaken
if errorIsDeadLineExceeded(err) {
result.timedOut = true
}
if !isWarmup {
results <- *result
}
return err
}