blob: accfe5f636c5fce11de8eebcfd4dae03e01ed57a [file] [log] [blame]
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"fmt"
"log"
"runtime"
"strconv"
"strings"
"time"
"cloud.google.com/go/storage"
)
type benchmarkResult struct {
objectSize int64
directorySize int64 // if benchmark is on a directory, this will be > 0
readOffset int64
params randomizedParams
isRead bool
readIteration int
start time.Time
elapsedTime time.Duration
err error
timedOut bool
startMem runtime.MemStats
endMem runtime.MemStats
metricName string // defaults to throughput
}
func (br *benchmarkResult) calculateThroughput() float64 {
throughput := float64(br.objectSize) / float64(br.elapsedTime.Seconds())
// Calculate throughput per entire directory
if br.directorySize > 0 {
throughput = float64(br.directorySize) / float64(br.elapsedTime.Seconds())
}
// Calculate throughput per range size instead of object size if only
// downloading a range
if br.isRead && opts.rangeSize > 0 {
throughput = float64(opts.rangeSize) / float64(br.elapsedTime.Seconds())
if br.directorySize > 0 {
throughput = float64(opts.rangeSize) * float64(opts.numObjectsPerDirectory) / float64(br.elapsedTime.Seconds())
}
}
return throughput
}
func (br *benchmarkResult) selectReadParams(opts benchmarkOptions, api benchmarkAPI) {
br.params = randomizedParams{
appBufferSize: opts.readBufferSize,
crc32cEnabled: true, // crc32c is always verified in the Go GCS library
md5Enabled: false, // we only need one integrity validation
api: api,
rangeOffset: randomInt64(opts.minReadOffset, opts.maxReadOffset),
}
if opts.readBufferSize == useDefault {
switch api {
case xmlAPI, jsonAPI:
br.params.appBufferSize = 4 << 10 // default for HTTP
case grpcAPI, directPath:
br.params.appBufferSize = 32 << 10 // default for GRPC
}
}
}
func (br *benchmarkResult) selectWriteParams(opts benchmarkOptions, api benchmarkAPI) {
// There is no XML implementation for writes
if api == xmlAPI {
api = jsonAPI
}
_, doMD5, doCRC32C := randomOf3()
br.params = randomizedParams{
appBufferSize: opts.writeBufferSize,
chunkSize: randomInt64(opts.minChunkSize, opts.maxChunkSize),
crc32cEnabled: doCRC32C,
md5Enabled: doMD5,
api: api,
}
if opts.minChunkSize == useDefault || opts.maxChunkSize == useDefault {
// get a writer on a non-existing object to check the default chunksize
if c, err := storage.NewClient(context.Background()); err != nil {
log.Printf("storage.NewClient: %v", err)
} else {
w := c.Bucket("").Object("").NewWriter(context.Background())
br.params.chunkSize = int64(w.ChunkSize)
}
}
if opts.writeBufferSize == useDefault {
switch api {
case xmlAPI, jsonAPI:
br.params.appBufferSize = 4 << 10 // default for HTTP
case grpcAPI, directPath:
br.params.appBufferSize = 32 << 10 // default for GRPC
}
}
}
func (br *benchmarkResult) copyParams(from *benchmarkResult) {
br.params = randomizedParams{
appBufferSize: from.params.appBufferSize,
chunkSize: from.params.chunkSize,
crc32cEnabled: from.params.crc32cEnabled,
md5Enabled: from.params.md5Enabled,
api: from.params.api,
rangeOffset: from.params.rangeOffset,
}
}
// converts result to cloud monitoring format
func (br *benchmarkResult) cloudMonitoring() []byte {
var sb strings.Builder
op := "WRITE"
if br.isRead {
op = fmt.Sprintf("READ[%d]", br.readIteration)
}
status := "OK"
if br.err != nil {
status = "FAIL"
}
if br.timedOut {
status = "TIMEOUT"
}
// Cloud monitoring only allows letters, numbers and underscores
sanitizeKey := func(key string) string {
key = strings.Replace(key, ".", "", -1)
return strings.Replace(key, "/", "_", -1)
}
sanitizeValue := func(key string) string {
return strings.Replace(key, "\"", "", -1)
}
// For values of type string
makeStringQuoted := func(parameter string, value any) string {
return fmt.Sprintf("%s=\"%v\"", parameter, value)
}
// For values of type int, bool
makeStringUnquoted := func(parameter string, value any) string {
return fmt.Sprintf("%s=%v", parameter, value)
}
sb.Grow(380)
if br.metricName != "" {
sb.WriteString(br.metricName)
sb.WriteRune('{')
} else {
sb.WriteString("throughput{")
}
sb.WriteString(makeStringQuoted("library", "go"))
sb.WriteString(",")
sb.WriteString(makeStringQuoted("api", br.params.api))
sb.WriteString(",")
sb.WriteString(makeStringQuoted("op", op))
sb.WriteString(",")
if br.directorySize > 0 {
sb.WriteString(makeStringUnquoted("directory_size", br.directorySize))
sb.WriteString(",")
sb.WriteString(makeStringUnquoted("num_objects", opts.numObjectsPerDirectory))
sb.WriteString(",")
} else {
// object_size is not reliable for directories as they can have a mix of
// object sizes; therefore, we do not output it
sb.WriteString(makeStringUnquoted("object_size", br.objectSize))
sb.WriteString(",")
}
if op != "WRITE" && opts.rangeSize > 0 {
sb.WriteString(makeStringUnquoted("transfer_size", opts.rangeSize))
sb.WriteString(",")
sb.WriteString(makeStringUnquoted("transfer_offset", br.params.rangeOffset))
sb.WriteString(",")
}
if op == "WRITE" {
sb.WriteString(makeStringUnquoted("chunksize", br.params.chunkSize))
sb.WriteString(",")
}
sb.WriteString(makeStringUnquoted("workers", opts.numWorkers))
sb.WriteString(",")
sb.WriteString(makeStringUnquoted("crc32c_enabled", br.params.crc32cEnabled))
sb.WriteString(",")
sb.WriteString(makeStringUnquoted("md5_enabled", br.params.md5Enabled))
sb.WriteString(",")
sb.WriteString(makeStringQuoted("bucket_name", opts.bucket))
sb.WriteString(",")
sb.WriteString(makeStringQuoted("status", status))
sb.WriteString(",")
if br.err != nil {
sb.WriteString(makeStringQuoted("failure_msg", sanitizeValue(br.err.Error())))
sb.WriteString(",")
}
sb.WriteString(makeStringUnquoted("app_buffer_size", br.params.appBufferSize))
sb.WriteString(",")
if br.params.api == directPath || br.params.api == grpcAPI {
sb.WriteString(makeStringUnquoted("connection_pool_size", opts.connPoolSize))
sb.WriteString(",")
}
if len(opts.endpoint) > 0 {
sb.WriteString(makeStringQuoted("endpoint", opts.endpoint))
sb.WriteString(",")
}
sb.WriteString(makeStringQuoted("code_version", codeVersion))
sb.WriteString(",")
sb.WriteString(makeStringQuoted("go_version", goVersion))
for dep, ver := range dependencyVersions {
sb.WriteString(",")
sb.WriteString(makeStringQuoted(sanitizeKey(dep), ver))
}
sb.WriteString("} ")
sb.WriteString(strconv.FormatFloat(br.calculateThroughput(), 'f', 2, 64))
return []byte(sb.String())
}
// converts result to csv writing format (ie. a slice of strings)
func (br *benchmarkResult) csv() []string {
op := "WRITE"
if br.isRead {
op = fmt.Sprintf("READ[%d]", br.readIteration)
}
status := "OK"
if br.err != nil {
status = "FAIL"
}
if br.timedOut {
status = "TIMEOUT"
}
record := make(map[string]string)
record["Op"] = op
if br.directorySize > 0 {
record["DirectorySize"] = strconv.FormatInt(br.directorySize, 10)
record["NumObjects"] = strconv.Itoa(opts.numObjectsPerDirectory)
} else {
record["ObjectSize"] = strconv.FormatInt(br.objectSize, 10)
}
if opts.rangeSize > 0 {
record["TransferSize"] = strconv.Itoa(int(opts.rangeSize))
record["TransferOffset"] = strconv.Itoa(int(br.params.rangeOffset))
}
record["AppBufferSize"] = strconv.Itoa(br.params.appBufferSize)
record["LibBufferSize"] = strconv.Itoa(int(br.params.chunkSize))
record["Crc32cEnabled"] = strconv.FormatBool(br.params.crc32cEnabled)
record["MD5Enabled"] = strconv.FormatBool(br.params.md5Enabled)
record["ApiName"] = string(br.params.api)
record["ElapsedTimeUs"] = strconv.FormatInt(br.elapsedTime.Microseconds(), 10)
record["CpuTimeUs"] = "-1" // TODO: record cpu time
record["Status"] = status
record["HeapSys"] = strconv.FormatUint(br.startMem.HeapSys, 10)
record["HeapAlloc"] = strconv.FormatUint(br.startMem.HeapAlloc, 10)
record["StackInUse"] = strconv.FormatUint(br.startMem.StackInuse, 10)
// commented out to avoid large numbers messing up BigQuery imports
record["HeapAllocDiff"] = "-1" //strconv.FormatUint(br.endMem.HeapAlloc-br.startMem.HeapAlloc, 10),
record["MallocsDiff"] = strconv.FormatUint(br.endMem.Mallocs-br.startMem.Mallocs, 10)
record["ConnectionPoolSize"] = strconv.Itoa(opts.connPoolSize)
record["StartTime"] = strconv.FormatInt(br.start.Unix(), 10)
record["EndTime"] = strconv.FormatInt(br.start.Add(br.elapsedTime).Unix(), 10)
record["NumWorkers"] = strconv.Itoa(opts.numWorkers)
record["CodeVersion"] = codeVersion
record["BucketName"] = opts.bucket
var result []string
for _, h := range *selectHeader() {
result = append(result, record[h])
}
return result
}
var (
csvHeader = []string{
"Op", "ObjectSize", "AppBufferSize", "LibBufferSize",
"Crc32cEnabled", "MD5Enabled", "ApiName",
"ElapsedTimeUs", "CpuTimeUs", "Status",
"HeapSys", "HeapAlloc", "StackInUse", "HeapAllocDiff", "MallocsDiff",
"ConnectionPoolSize", "StartTime", "EndTime", "NumWorkers",
"CodeVersion", "BucketName",
}
csvHeaderRangeReads = []string{
"Op", "ObjectSize", "TransferSize", "TransferOffset",
"AppBufferSize", "LibBufferSize",
"Crc32cEnabled", "MD5Enabled", "ApiName",
"ElapsedTimeUs", "CpuTimeUs", "Status",
"HeapSys", "HeapAlloc", "StackInUse", "HeapAllocDiff", "MallocsDiff",
"ConnectionPoolSize", "StartTime", "EndTime", "NumWorkers",
"CodeVersion", "BucketName",
}
csvHeaderWorkload6 = []string{
"Op", "DirectorySize", "NumObjects", "AppBufferSize", "LibBufferSize",
"Crc32cEnabled", "MD5Enabled", "ApiName",
"ElapsedTimeUs", "CpuTimeUs", "Status",
"HeapSys", "HeapAlloc", "StackInUse", "HeapAllocDiff", "MallocsDiff",
"ConnectionPoolSize", "StartTime", "EndTime", "NumWorkers",
"CodeVersion", "BucketName",
}
)
func selectHeader() *[]string {
header := &csvHeader
if opts.workload == 6 {
header = &csvHeaderWorkload6
} else if opts.rangeSize > 0 {
header = &csvHeaderRangeReads
}
return header
}