blob: aabc63bf8017b3ef8287cd5981aca5a014382d00 [file] [log] [blame] [edit]
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"errors"
"fmt"
"log"
"math/rand/v2"
"os"
"slices"
"strconv"
"time"
"cloud.google.com/go/spanner"
traceExporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)
/*
**
Employees table schema:
CREATE TABLE Employees(
ID int64,
NAME STRING(50)
) PRIMARY KEY(ID)
*/
const (
selectQuery = "SELECT ID from EMPLOYEES WHERE ID = @p1"
totalRecords = 100000
tableName = "EMPLOYEES"
)
type transactionType string
const (
read transactionType = "READ"
query = "QUERY"
)
type cloudEnvironment string
const (
production cloudEnvironment = "PRODUCTION"
devel = "DEVEL"
)
var spannerHosts = map[cloudEnvironment]string{
production: "spanner.googleapis.com:443",
devel: "staging-wrenchworks.sandbox.googleapis.com:443",
}
var monitoringHosts = map[cloudEnvironment]string{
production: "monitoring.googleapis.com:443",
devel: "staging-monitoring.sandbox.googleapis.com:443",
}
var cloudTracingHosts = map[cloudEnvironment]string{
production: "cloudtrace.googleapis.com:443",
devel: "staging-cloudtrace.sandbox.googleapis.com:443",
}
type benchmarkingConfiguration struct {
warmUpTime int8 // in minutes, default 7 minutes
executionTime int8 // in minutes, default 30 minutes
waitBetweenRequests uint8 // in ms, default 5 ms
staleness int8 // in seconds, default 15s
parsedTransactionType transactionType // default read
tracesEnabled bool // default false
disableNativeMetrics bool // default false
traceSamplingFraction float32 // default 0.5
}
func getDefaultBenchmarkingConfiguration() benchmarkingConfiguration {
return benchmarkingConfiguration{warmUpTime: 7, executionTime: 30, waitBetweenRequests: 5, staleness: 15, parsedTransactionType: read, tracesEnabled: false, disableNativeMetrics: false, traceSamplingFraction: 0.5}
}
func main() {
ctx := context.Background()
project := os.Getenv("SPANNER_CLIENT_BENCHMARK_GOOGLE_CLOUD_PROJECT")
instance := os.Getenv("SPANNER_CLIENT_BENCHMARK_SPANNER_INSTANCE")
database := os.Getenv("SPANNER_CLIENT_BENCHMARK_SPANNER_DATABASE")
if project == "" || instance == "" || database == "" {
fmt.Println(`You must set all the environment variables SPANNER_CLIENT_BENCHMARK_GOOGLE_CLOUD_PROJECT,
SPANNER_CLIENT_BENCHMARK_SPANNER_INSTANCE and SPANNER_CLIENT_BENCHMARK_SPANNER_DATABASE`)
return
}
environment := parseCloudEnvironment(os.Getenv("SPANNER_CLIENT_BENCHMARK_CLOUD_ENVIRONMENT"))
host := spannerHosts[environment]
monitoringHost := monitoringHosts[environment]
if err := setMonitoringHost(monitoringHost); err != nil {
fmt.Println(err)
return
}
bc := getDefaultBenchmarkingConfiguration()
if err := parseCommandLineArguments(os.Args, &bc); err != nil {
fmt.Println(err)
return
}
db := fmt.Sprintf("projects/%v/instances/%v/databases/%v", project, instance, database)
fmt.Printf("\nRunning benchmark on %v\nEnvironment: %v\nWarm up time: %v mins\nExecution Time: %v mins\nWait Between Requests: %v ms\nStaleness: %v secs\nTraces Enabled: %v\nDisable Native Metrics: %v\nTrace Sampling Fraction: %v\nTransaction Type: %v\n\n", db, environment, bc.warmUpTime, bc.executionTime, bc.waitBetweenRequests, bc.staleness, bc.tracesEnabled, bc.disableNativeMetrics, bc.traceSamplingFraction, bc.parsedTransactionType)
if bc.tracesEnabled {
enableTracingWithCloudTraceExporter(project, cloudTracingHosts[environment])
}
client, err := spanner.NewClientWithConfig(ctx, db, spanner.ClientConfig{DisableNativeMetrics: bc.disableNativeMetrics}, option.WithEndpoint(host))
if err != nil {
return
}
defer client.Close()
err = warmUp(ctx, client, bc.warmUpTime, bc.staleness, bc.parsedTransactionType)
if err != nil {
fmt.Println(err)
return
}
latencies, err := runBenchmark(ctx, client, bc.executionTime, bc.staleness, bc.waitBetweenRequests, bc.parsedTransactionType)
if err != nil {
fmt.Println(err)
return
}
slices.Sort(latencies)
fmt.Printf("\nResults\np50 %v\np95 %v\np99 %v\n", percentiles(0.5, latencies),
percentiles(0.95, latencies), percentiles(0.99, latencies))
}
func warmUp(ctx context.Context, client *spanner.Client, warmupTime int8, staleness int8, transactionType transactionType) error {
endTime := time.Now().Local().Add(time.Minute * time.Duration(warmupTime))
go runTimer(endTime, "Remaining warmup time")
for {
if time.Now().Local().After(endTime) {
break
}
_, err := execute(ctx, transactionType, client, staleness)
if err != nil {
fmt.Println(err)
return err
}
}
return nil
}
func runBenchmark(ctx context.Context, client *spanner.Client, executionTime int8, staleness int8, waitBetweenRequests uint8, transactionType transactionType) ([]int64, error) {
endTime := time.Now().Local().Add(time.Minute * time.Duration(executionTime))
go runTimer(endTime, "Remaining operation time")
var latencies []int64
for {
if time.Now().Local().After(endTime) {
break
}
duration, err := execute(ctx, transactionType, client, staleness)
if err != nil {
fmt.Println(err)
return make([]int64, 0), err
}
latencies = append(latencies, duration.Microseconds())
time.Sleep(time.Millisecond * getRandomWaitTime(waitBetweenRequests))
}
return latencies, nil
}
func execute(ctx context.Context, transactionType transactionType, client *spanner.Client, staleness int8) (time.Duration, error) {
switch transactionType {
case query:
return executeQuery(ctx, client, staleness)
case read:
return executeRead(ctx, client, staleness)
default:
return 0, errors.New("invalid transaction type")
}
}
func executeQuery(ctx context.Context, client *spanner.Client, staleness int8) (time.Duration, error) {
start := time.Now()
iter := client.Single().WithTimestampBound(spanner.ExactStaleness(time.Second*time.Duration(staleness))).Query(ctx, spanner.Statement{SQL: selectQuery, Params: map[string]interface{}{
"p1": generateUniqueID(),
}})
for {
row, err := iter.Next()
if errors.Is(err, iterator.Done) {
break
}
if err != nil {
return time.Duration(0), err
}
var id int64
if err := row.Columns(&id); err != nil {
return time.Duration(0), err
}
}
return time.Since(start), nil
}
func executeRead(ctx context.Context, client *spanner.Client, staleness int8) (time.Duration, error) {
start := time.Now()
iter := client.Single().WithTimestampBound(spanner.ExactStaleness(time.Second*time.Duration(staleness))).Read(ctx, tableName, spanner.Key{generateUniqueID()}, []string{"ID"})
for {
row, err := iter.Next()
if errors.Is(err, iterator.Done) {
break
}
if err != nil {
return time.Duration(0), err
}
var id int64
if err := row.Columns(&id); err != nil {
return time.Duration(0), err
}
}
return time.Since(start), nil
}
func enableTracingWithCloudTraceExporter(projectID string, cloudTracingHost string) {
res, err := resource.Merge(resource.Default(),
resource.NewWithAttributes(semconv.SchemaURL,
semconv.ServiceName("Go-Benchmarking"),
semconv.ServiceVersion("0.1.0"),
))
if err != nil {
log.Fatal(err)
}
// Create a new cloud trace exporter
exporter, err := traceExporter.New(traceExporter.WithProjectID(projectID), traceExporter.WithTraceClientOptions([]option.ClientOption{option.WithEndpoint(cloudTracingHost)}))
if err != nil {
log.Fatal(err)
}
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithResource(res),
sdktrace.WithBatcher(exporter),
sdktrace.WithSampler(sdktrace.TraceIDRatioBased(0.5)),
)
otel.SetTracerProvider(tracerProvider)
}
func runTimer(endTime time.Time, text string) {
for {
var t time.Time
t = t.Add(endTime.Sub(time.Now()))
fmt.Printf("\r%v %v", text, t.Format(time.TimeOnly))
time.Sleep(time.Second)
if time.Now().Local().After(endTime) {
break
}
}
}
func setMonitoringHost(monitoringHost string) error {
if err := os.Setenv("SPANNER_MONITORING_HOST", monitoringHost); err != nil {
return err
}
return nil
}
func parseCommandLineArguments(args []string, bc *benchmarkingConfiguration) error {
if len(args)%2 == 0 {
return errors.New("some configuration is missing")
}
index := 1
for {
if index >= len(args) {
break
}
commandLineOption := args[index][1:]
commandLineValue := args[index+1]
switch commandLineOption {
case "wu", "warmUpTime":
val, err := strconv.ParseInt(commandLineValue, 10, 8)
if err != nil {
return err
}
bc.warmUpTime = int8(val)
case "et", "executionTime":
val, err := strconv.ParseInt(commandLineValue, 10, 8)
if err != nil {
return err
}
bc.executionTime = int8(val)
case "wbr", "waitBetweenRequests":
val, err := strconv.ParseInt(commandLineValue, 10, 8)
if err != nil {
return err
}
bc.waitBetweenRequests = uint8(val)
case "st", "staleness":
val, err := strconv.ParseInt(commandLineValue, 10, 8)
if err != nil {
return err
}
bc.staleness = int8(val)
case "transactionType":
parsedTransactionType, err := parseTransactionType(commandLineValue)
if err != nil {
return err
}
bc.parsedTransactionType = parsedTransactionType
case "te", "tracesEnabled":
tracesEnabled, err := strconv.ParseBool(commandLineValue)
if err != nil {
return err
}
bc.tracesEnabled = tracesEnabled
case "dnm", "disableNativeMetrics":
disableNativeMetrics, err := strconv.ParseBool(commandLineValue)
if err != nil {
return err
}
bc.disableNativeMetrics = disableNativeMetrics
case "tsf", "traceSamplingFraction":
traceSamplingFraction, err := strconv.ParseFloat(commandLineValue, 32)
if err != nil {
return err
}
bc.traceSamplingFraction = float32(traceSamplingFraction)
default:
return fmt.Errorf("invalid option %v", commandLineOption)
}
index += 2
}
return nil
}
func parseTransactionType(s string) (transactionType, error) {
switch s {
case "READ":
return read, nil
case "QUERY":
return query, nil
default:
return query, errors.New("invalid transaction type")
}
}
func parseCloudEnvironment(environment string) cloudEnvironment {
switch environment {
case "DEVEL":
return devel
default:
return production
}
}
func percentiles(percentile float32, latencies []int64) any {
rank := (percentile * float32(len(latencies)-1)) + 1
return latencies[uint(rank)]
}
func generateUniqueID() int64 {
return rand.Int64N(totalRecords) + 1
}
func getRandomWaitTime(waitTime uint8) time.Duration {
if waitTime <= 0 {
return time.Duration(0)
}
return time.Duration(rand.Int64N(int64(2*waitTime-1)) + 1)
}