blob: 956024c7f047994de240970ba38463f189405b9a [file] [log] [blame]
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
Package main provides benchmark with setting flags.
An example to run some benchmarks with profiling enabled:
go run benchmark/benchmain/main.go -benchtime=10s -workloads=all \
-compression=gzip -maxConcurrentCalls=1 -trace=off \
-reqSizeBytes=1,1048576 -respSizeBytes=1,1048576 -networkMode=Local \
-cpuProfile=cpuProf -memProfile=memProf -memProfileRate=10000 -resultFile=result
As a suggestion, when creating a branch, you can run this benchmark and save the result
file "-resultFile=basePerf", and later when you at the middle of the work or finish the
work, you can get the benchmark result and compare it with the base anytime.
Assume there are two result files names as "basePerf" and "curPerf" created by adding
-resultFile=basePerf and -resultFile=curPerf.
To format the curPerf, run:
go run benchmark/benchresult/main.go curPerf
To observe how the performance changes based on a base result, run:
go run benchmark/benchresult/main.go basePerf curPerf
*/
package main
import (
"context"
"encoding/gob"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"os"
"reflect"
"runtime"
"runtime/pprof"
"strings"
"sync"
"sync/atomic"
"time"
"google.golang.org/grpc"
bm "google.golang.org/grpc/benchmark"
"google.golang.org/grpc/benchmark/flags"
testpb "google.golang.org/grpc/benchmark/grpc_testing"
"google.golang.org/grpc/benchmark/latency"
"google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/test/bufconn"
)
var (
workloads = flags.StringWithAllowedValues("workloads", workloadsAll,
fmt.Sprintf("Workloads to execute - One of: %v", strings.Join(allWorkloads, ", ")), allWorkloads)
traceMode = flags.StringWithAllowedValues("trace", toggleModeOff,
fmt.Sprintf("Trace mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
preloaderMode = flags.StringWithAllowedValues("preloader", toggleModeOff,
fmt.Sprintf("Preloader mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
channelzOn = flags.StringWithAllowedValues("channelz", toggleModeOff,
fmt.Sprintf("Channelz mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
compressorMode = flags.StringWithAllowedValues("compression", compModeOff,
fmt.Sprintf("Compression mode - One of: %v", strings.Join(allCompModes, ", ")), allCompModes)
networkMode = flags.StringWithAllowedValues("networkMode", networkModeNone,
"Network mode includes LAN, WAN, Local and Longhaul", allNetworkModes)
readLatency = flags.DurationSlice("latency", defaultReadLatency, "Simulated one-way network latency - may be a comma-separated list")
readKbps = flags.IntSlice("kbps", defaultReadKbps, "Simulated network throughput (in kbps) - may be a comma-separated list")
readMTU = flags.IntSlice("mtu", defaultReadMTU, "Simulated network MTU (Maximum Transmission Unit) - may be a comma-separated list")
maxConcurrentCalls = flags.IntSlice("maxConcurrentCalls", defaultMaxConcurrentCalls, "Number of concurrent RPCs during benchmarks")
readReqSizeBytes = flags.IntSlice("reqSizeBytes", nil, "Request size in bytes - may be a comma-separated list")
readRespSizeBytes = flags.IntSlice("respSizeBytes", nil, "Response size in bytes - may be a comma-separated list")
reqPayloadCurveFiles = flags.StringSlice("reqPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of request payload sizes")
respPayloadCurveFiles = flags.StringSlice("respPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of response payload sizes")
benchTime = flag.Duration("benchtime", time.Second, "Configures the amount of time to run each benchmark")
memProfile = flag.String("memProfile", "", "Enables memory profiling output to the filename provided.")
memProfileRate = flag.Int("memProfileRate", 512*1024, "Configures the memory profiling rate. \n"+
"memProfile should be set before setting profile rate. To include every allocated block in the profile, "+
"set MemProfileRate to 1. To turn off profiling entirely, set MemProfileRate to 0. 512 * 1024 by default.")
cpuProfile = flag.String("cpuProfile", "", "Enables CPU profiling output to the filename provided")
benchmarkResultFile = flag.String("resultFile", "", "Save the benchmark result into a binary file")
useBufconn = flag.Bool("bufconn", false, "Use in-memory connection instead of system network I/O")
enableKeepalive = flag.Bool("enable_keepalive", false, "Enable client keepalive. \n"+
"Keepalive.Time is set to 10s, Keepalive.Timeout is set to 1s, Keepalive.PermitWithoutStream is set to true.")
logger = grpclog.Component("benchmark")
)
const (
workloadsUnary = "unary"
workloadsStreaming = "streaming"
workloadsUnconstrained = "unconstrained"
workloadsAll = "all"
// Compression modes.
compModeOff = "off"
compModeGzip = "gzip"
compModeNop = "nop"
compModeAll = "all"
// Toggle modes.
toggleModeOff = "off"
toggleModeOn = "on"
toggleModeBoth = "both"
// Network modes.
networkModeNone = "none"
networkModeLocal = "Local"
networkModeLAN = "LAN"
networkModeWAN = "WAN"
networkLongHaul = "Longhaul"
numStatsBuckets = 10
warmupCallCount = 10
warmuptime = time.Second
)
var (
allWorkloads = []string{workloadsUnary, workloadsStreaming, workloadsUnconstrained, workloadsAll}
allCompModes = []string{compModeOff, compModeGzip, compModeNop, compModeAll}
allToggleModes = []string{toggleModeOff, toggleModeOn, toggleModeBoth}
allNetworkModes = []string{networkModeNone, networkModeLocal, networkModeLAN, networkModeWAN, networkLongHaul}
defaultReadLatency = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay.
defaultReadKbps = []int{0, 10240} // if non-positive, infinite
defaultReadMTU = []int{0} // if non-positive, infinite
defaultMaxConcurrentCalls = []int{1, 8, 64, 512}
defaultReqSizeBytes = []int{1, 1024, 1024 * 1024}
defaultRespSizeBytes = []int{1, 1024, 1024 * 1024}
networks = map[string]latency.Network{
networkModeLocal: latency.Local,
networkModeLAN: latency.LAN,
networkModeWAN: latency.WAN,
networkLongHaul: latency.Longhaul,
}
keepaliveTime = 10 * time.Second
keepaliveTimeout = 1 * time.Second
// This is 0.8*keepaliveTime to prevent connection issues because of server
// keepalive enforcement.
keepaliveMinTime = 8 * time.Second
)
// runModes indicates the workloads to run. This is initialized with a call to
// `runModesFromWorkloads`, passing the workloads flag set by the user.
type runModes struct {
unary, streaming, unconstrained bool
}
// runModesFromWorkloads determines the runModes based on the value of
// workloads flag set by the user.
func runModesFromWorkloads(workload string) runModes {
r := runModes{}
switch workload {
case workloadsUnary:
r.unary = true
case workloadsStreaming:
r.streaming = true
case workloadsUnconstrained:
r.unconstrained = true
case workloadsAll:
r.unary = true
r.streaming = true
r.unconstrained = true
default:
log.Fatalf("Unknown workloads setting: %v (want one of: %v)",
workloads, strings.Join(allWorkloads, ", "))
}
return r
}
type startFunc func(mode string, bf stats.Features)
type stopFunc func(count uint64)
type ucStopFunc func(req uint64, resp uint64)
type rpcCallFunc func(pos int)
type rpcSendFunc func(pos int)
type rpcRecvFunc func(pos int)
type rpcCleanupFunc func()
func unaryBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
caller, cleanup := makeFuncUnary(bf)
defer cleanup()
runBenchmark(caller, start, stop, bf, s, workloadsUnary)
}
func streamBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
caller, cleanup := makeFuncStream(bf)
defer cleanup()
runBenchmark(caller, start, stop, bf, s, workloadsStreaming)
}
func unconstrainedStreamBenchmark(start startFunc, stop ucStopFunc, bf stats.Features, s *stats.Stats) {
var sender rpcSendFunc
var recver rpcRecvFunc
var cleanup rpcCleanupFunc
if bf.EnablePreloader {
sender, recver, cleanup = makeFuncUnconstrainedStreamPreloaded(bf)
} else {
sender, recver, cleanup = makeFuncUnconstrainedStream(bf)
}
defer cleanup()
var req, resp uint64
go func() {
// Resets the counters once warmed up
<-time.NewTimer(warmuptime).C
atomic.StoreUint64(&req, 0)
atomic.StoreUint64(&resp, 0)
start(workloadsUnconstrained, bf)
}()
bmEnd := time.Now().Add(bf.BenchTime + warmuptime)
var wg sync.WaitGroup
wg.Add(2 * bf.MaxConcurrentCalls)
for i := 0; i < bf.MaxConcurrentCalls; i++ {
go func(pos int) {
defer wg.Done()
for {
t := time.Now()
if t.After(bmEnd) {
return
}
sender(pos)
atomic.AddUint64(&req, 1)
}
}(i)
go func(pos int) {
defer wg.Done()
for {
t := time.Now()
if t.After(bmEnd) {
return
}
recver(pos)
atomic.AddUint64(&resp, 1)
}
}(i)
}
wg.Wait()
stop(req, resp)
}
// makeClient returns a gRPC client for the grpc.testing.BenchmarkService
// service. The client is configured using the different options in the passed
// 'bf'. Also returns a cleanup function to close the client and release
// resources.
func makeClient(bf stats.Features) (testpb.BenchmarkServiceClient, func()) {
nw := &latency.Network{Kbps: bf.Kbps, Latency: bf.Latency, MTU: bf.MTU}
opts := []grpc.DialOption{}
sopts := []grpc.ServerOption{}
if bf.ModeCompressor == compModeNop {
sopts = append(sopts,
grpc.RPCCompressor(nopCompressor{}),
grpc.RPCDecompressor(nopDecompressor{}),
)
opts = append(opts,
grpc.WithCompressor(nopCompressor{}),
grpc.WithDecompressor(nopDecompressor{}),
)
}
if bf.ModeCompressor == compModeGzip {
sopts = append(sopts,
grpc.RPCCompressor(grpc.NewGZIPCompressor()),
grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
)
opts = append(opts,
grpc.WithCompressor(grpc.NewGZIPCompressor()),
grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
)
}
if bf.EnableKeepalive {
sopts = append(sopts,
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: keepaliveTime,
Timeout: keepaliveTimeout,
}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: keepaliveMinTime,
PermitWithoutStream: true,
}),
)
opts = append(opts,
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: keepaliveTime,
Timeout: keepaliveTimeout,
PermitWithoutStream: true,
}),
)
}
sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(bf.MaxConcurrentCalls+1)))
opts = append(opts, grpc.WithInsecure())
var lis net.Listener
if bf.UseBufConn {
bcLis := bufconn.Listen(256 * 1024)
lis = bcLis
opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
return nw.ContextDialer(func(context.Context, string, string) (net.Conn, error) {
return bcLis.Dial()
})(ctx, "", "")
}))
} else {
var err error
lis, err = net.Listen("tcp", "localhost:0")
if err != nil {
logger.Fatalf("Failed to listen: %v", err)
}
opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
return nw.ContextDialer((&net.Dialer{}).DialContext)(ctx, "tcp", lis.Addr().String())
}))
}
lis = nw.Listener(lis)
stopper := bm.StartServer(bm.ServerInfo{Type: "protobuf", Listener: lis}, sopts...)
conn := bm.NewClientConn("" /* target not used */, opts...)
return testpb.NewBenchmarkServiceClient(conn), func() {
conn.Close()
stopper()
}
}
func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
tc, cleanup := makeClient(bf)
return func(int) {
reqSizeBytes := bf.ReqSizeBytes
respSizeBytes := bf.RespSizeBytes
if bf.ReqPayloadCurve != nil {
reqSizeBytes = bf.ReqPayloadCurve.ChooseRandom()
}
if bf.RespPayloadCurve != nil {
respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
}
unaryCaller(tc, reqSizeBytes, respSizeBytes)
}, cleanup
}
func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
tc, cleanup := makeClient(bf)
streams := make([]testpb.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
for i := 0; i < bf.MaxConcurrentCalls; i++ {
stream, err := tc.StreamingCall(context.Background())
if err != nil {
logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
}
streams[i] = stream
}
return func(pos int) {
reqSizeBytes := bf.ReqSizeBytes
respSizeBytes := bf.RespSizeBytes
if bf.ReqPayloadCurve != nil {
reqSizeBytes = bf.ReqPayloadCurve.ChooseRandom()
}
if bf.RespPayloadCurve != nil {
respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
}
streamCaller(streams[pos], reqSizeBytes, respSizeBytes)
}, cleanup
}
func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
streams, req, cleanup := setupUnconstrainedStream(bf)
preparedMsg := make([]*grpc.PreparedMsg, len(streams))
for i, stream := range streams {
preparedMsg[i] = &grpc.PreparedMsg{}
err := preparedMsg[i].Encode(stream, req)
if err != nil {
logger.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[i], req, stream, err)
}
}
return func(pos int) {
streams[pos].SendMsg(preparedMsg[pos])
}, func(pos int) {
streams[pos].Recv()
}, cleanup
}
func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
streams, req, cleanup := setupUnconstrainedStream(bf)
return func(pos int) {
streams[pos].Send(req)
}, func(pos int) {
streams[pos].Recv()
}, cleanup
}
func setupUnconstrainedStream(bf stats.Features) ([]testpb.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) {
tc, cleanup := makeClient(bf)
streams := make([]testpb.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
for i := 0; i < bf.MaxConcurrentCalls; i++ {
stream, err := tc.UnconstrainedStreamingCall(context.Background())
if err != nil {
logger.Fatalf("%v.UnconstrainedStreamingCall(_) = _, %v", tc, err)
}
streams[i] = stream
}
pl := bm.NewPayload(testpb.PayloadType_COMPRESSABLE, bf.ReqSizeBytes)
req := &testpb.SimpleRequest{
ResponseType: pl.Type,
ResponseSize: int32(bf.RespSizeBytes),
Payload: pl,
}
return streams, req, cleanup
}
// Makes a UnaryCall gRPC request using the given BenchmarkServiceClient and
// request and response sizes.
func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) {
if err := bm.DoUnaryCall(client, reqSize, respSize); err != nil {
logger.Fatalf("DoUnaryCall failed: %v", err)
}
}
func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
if err := bm.DoStreamingRoundTrip(stream, reqSize, respSize); err != nil {
logger.Fatalf("DoStreamingRoundTrip failed: %v", err)
}
}
func runBenchmark(caller rpcCallFunc, start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats, mode string) {
// Warm up connection.
for i := 0; i < warmupCallCount; i++ {
caller(0)
}
// Run benchmark.
start(mode, bf)
var wg sync.WaitGroup
wg.Add(bf.MaxConcurrentCalls)
bmEnd := time.Now().Add(bf.BenchTime)
var count uint64
for i := 0; i < bf.MaxConcurrentCalls; i++ {
go func(pos int) {
defer wg.Done()
for {
t := time.Now()
if t.After(bmEnd) {
return
}
start := time.Now()
caller(pos)
elapse := time.Since(start)
atomic.AddUint64(&count, 1)
s.AddDuration(elapse)
}
}(i)
}
wg.Wait()
stop(count)
}
// benchOpts represents all configurable options available while running this
// benchmark. This is built from the values passed as flags.
type benchOpts struct {
rModes runModes
benchTime time.Duration
memProfileRate int
memProfile string
cpuProfile string
networkMode string
benchmarkResultFile string
useBufconn bool
enableKeepalive bool
features *featureOpts
}
// featureOpts represents options which can have multiple values. The user
// usually provides a comma-separated list of options for each of these
// features through command line flags. We generate all possible combinations
// for the provided values and run the benchmarks for each combination.
type featureOpts struct {
enableTrace []bool
readLatencies []time.Duration
readKbps []int
readMTU []int
maxConcurrentCalls []int
reqSizeBytes []int
respSizeBytes []int
reqPayloadCurves []*stats.PayloadCurve
respPayloadCurves []*stats.PayloadCurve
compModes []string
enableChannelz []bool
enablePreloader []bool
}
// makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each
// element of the slice (indexed by 'featuresIndex' enum) contains the number
// of features to be exercised by the benchmark code.
// For example: Index 0 of the returned slice contains the number of values for
// enableTrace feature, while index 1 contains the number of value of
// readLatencies feature and so on.
func makeFeaturesNum(b *benchOpts) []int {
featuresNum := make([]int, stats.MaxFeatureIndex)
for i := 0; i < len(featuresNum); i++ {
switch stats.FeatureIndex(i) {
case stats.EnableTraceIndex:
featuresNum[i] = len(b.features.enableTrace)
case stats.ReadLatenciesIndex:
featuresNum[i] = len(b.features.readLatencies)
case stats.ReadKbpsIndex:
featuresNum[i] = len(b.features.readKbps)
case stats.ReadMTUIndex:
featuresNum[i] = len(b.features.readMTU)
case stats.MaxConcurrentCallsIndex:
featuresNum[i] = len(b.features.maxConcurrentCalls)
case stats.ReqSizeBytesIndex:
featuresNum[i] = len(b.features.reqSizeBytes)
case stats.RespSizeBytesIndex:
featuresNum[i] = len(b.features.respSizeBytes)
case stats.ReqPayloadCurveIndex:
featuresNum[i] = len(b.features.reqPayloadCurves)
case stats.RespPayloadCurveIndex:
featuresNum[i] = len(b.features.respPayloadCurves)
case stats.CompModesIndex:
featuresNum[i] = len(b.features.compModes)
case stats.EnableChannelzIndex:
featuresNum[i] = len(b.features.enableChannelz)
case stats.EnablePreloaderIndex:
featuresNum[i] = len(b.features.enablePreloader)
default:
log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex)
}
}
return featuresNum
}
// sharedFeatures returns a bool slice which acts as a bitmask. Each item in
// the slice represents a feature, indexed by 'featureIndex' enum. The bit is
// set to 1 if the corresponding feature does not have multiple value, so is
// shared amongst all benchmarks.
func sharedFeatures(featuresNum []int) []bool {
result := make([]bool, len(featuresNum))
for i, num := range featuresNum {
if num <= 1 {
result[i] = true
}
}
return result
}
// generateFeatures generates all combinations of the provided feature options.
// While all the feature options are stored in the benchOpts struct, the input
// parameter 'featuresNum' is a slice indexed by 'featureIndex' enum containing
// the number of values for each feature.
// For example, let's say the user sets -workloads=all and
// -maxConcurrentCalls=1,100, this would end up with the following
// combinations:
// [workloads: unary, maxConcurrentCalls=1]
// [workloads: unary, maxConcurrentCalls=1]
// [workloads: streaming, maxConcurrentCalls=100]
// [workloads: streaming, maxConcurrentCalls=100]
// [workloads: unconstrained, maxConcurrentCalls=1]
// [workloads: unconstrained, maxConcurrentCalls=100]
func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
// curPos and initialPos are two slices where each value acts as an index
// into the appropriate feature slice maintained in benchOpts.features. This
// loop generates all possible combinations of features by changing one value
// at a time, and once curPos becomes equal to initialPos, we have explored
// all options.
var result []stats.Features
var curPos []int
initialPos := make([]int, stats.MaxFeatureIndex)
for !reflect.DeepEqual(initialPos, curPos) {
if curPos == nil {
curPos = make([]int, stats.MaxFeatureIndex)
}
f := stats.Features{
// These features stay the same for each iteration.
NetworkMode: b.networkMode,
UseBufConn: b.useBufconn,
EnableKeepalive: b.enableKeepalive,
BenchTime: b.benchTime,
// These features can potentially change for each iteration.
EnableTrace: b.features.enableTrace[curPos[stats.EnableTraceIndex]],
Latency: b.features.readLatencies[curPos[stats.ReadLatenciesIndex]],
Kbps: b.features.readKbps[curPos[stats.ReadKbpsIndex]],
MTU: b.features.readMTU[curPos[stats.ReadMTUIndex]],
MaxConcurrentCalls: b.features.maxConcurrentCalls[curPos[stats.MaxConcurrentCallsIndex]],
ModeCompressor: b.features.compModes[curPos[stats.CompModesIndex]],
EnableChannelz: b.features.enableChannelz[curPos[stats.EnableChannelzIndex]],
EnablePreloader: b.features.enablePreloader[curPos[stats.EnablePreloaderIndex]],
}
if len(b.features.reqPayloadCurves) == 0 {
f.ReqSizeBytes = b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]]
} else {
f.ReqPayloadCurve = b.features.reqPayloadCurves[curPos[stats.ReqPayloadCurveIndex]]
}
if len(b.features.respPayloadCurves) == 0 {
f.RespSizeBytes = b.features.respSizeBytes[curPos[stats.RespSizeBytesIndex]]
} else {
f.RespPayloadCurve = b.features.respPayloadCurves[curPos[stats.RespPayloadCurveIndex]]
}
result = append(result, f)
addOne(curPos, featuresNum)
}
return result
}
// addOne mutates the input slice 'features' by changing one feature, thus
// arriving at the next combination of feature values. 'featuresMaxPosition'
// provides the numbers of allowed values for each feature, indexed by
// 'featureIndex' enum.
func addOne(features []int, featuresMaxPosition []int) {
for i := len(features) - 1; i >= 0; i-- {
if featuresMaxPosition[i] == 0 {
continue
}
features[i] = (features[i] + 1)
if features[i]/featuresMaxPosition[i] == 0 {
break
}
features[i] = features[i] % featuresMaxPosition[i]
}
}
// processFlags reads the command line flags and builds benchOpts. Specifying
// invalid values for certain flags will cause flag.Parse() to fail, and the
// program to terminate.
// This *SHOULD* be the only place where the flags are accessed. All other
// parts of the benchmark code should rely on the returned benchOpts.
func processFlags() *benchOpts {
flag.Parse()
if flag.NArg() != 0 {
log.Fatal("Error: unparsed arguments: ", flag.Args())
}
opts := &benchOpts{
rModes: runModesFromWorkloads(*workloads),
benchTime: *benchTime,
memProfileRate: *memProfileRate,
memProfile: *memProfile,
cpuProfile: *cpuProfile,
networkMode: *networkMode,
benchmarkResultFile: *benchmarkResultFile,
useBufconn: *useBufconn,
enableKeepalive: *enableKeepalive,
features: &featureOpts{
enableTrace: setToggleMode(*traceMode),
readLatencies: append([]time.Duration(nil), *readLatency...),
readKbps: append([]int(nil), *readKbps...),
readMTU: append([]int(nil), *readMTU...),
maxConcurrentCalls: append([]int(nil), *maxConcurrentCalls...),
reqSizeBytes: append([]int(nil), *readReqSizeBytes...),
respSizeBytes: append([]int(nil), *readRespSizeBytes...),
compModes: setCompressorMode(*compressorMode),
enableChannelz: setToggleMode(*channelzOn),
enablePreloader: setToggleMode(*preloaderMode),
},
}
if len(*reqPayloadCurveFiles) == 0 {
if len(opts.features.reqSizeBytes) == 0 {
opts.features.reqSizeBytes = defaultReqSizeBytes
}
} else {
if len(opts.features.reqSizeBytes) != 0 {
log.Fatalf("you may not specify -reqPayloadCurveFiles and -reqSizeBytes at the same time")
}
for _, file := range *reqPayloadCurveFiles {
pc, err := stats.NewPayloadCurve(file)
if err != nil {
log.Fatalf("cannot load payload curve file %s: %v", file, err)
}
opts.features.reqPayloadCurves = append(opts.features.reqPayloadCurves, pc)
}
opts.features.reqSizeBytes = nil
}
if len(*respPayloadCurveFiles) == 0 {
if len(opts.features.respSizeBytes) == 0 {
opts.features.respSizeBytes = defaultRespSizeBytes
}
} else {
if len(opts.features.respSizeBytes) != 0 {
log.Fatalf("you may not specify -respPayloadCurveFiles and -respSizeBytes at the same time")
}
for _, file := range *respPayloadCurveFiles {
pc, err := stats.NewPayloadCurve(file)
if err != nil {
log.Fatalf("cannot load payload curve file %s: %v", file, err)
}
opts.features.respPayloadCurves = append(opts.features.respPayloadCurves, pc)
}
opts.features.respSizeBytes = nil
}
// Re-write latency, kpbs and mtu if network mode is set.
if network, ok := networks[opts.networkMode]; ok {
opts.features.readLatencies = []time.Duration{network.Latency}
opts.features.readKbps = []int{network.Kbps}
opts.features.readMTU = []int{network.MTU}
}
return opts
}
func setToggleMode(val string) []bool {
switch val {
case toggleModeOn:
return []bool{true}
case toggleModeOff:
return []bool{false}
case toggleModeBoth:
return []bool{false, true}
default:
// This should never happen because a wrong value passed to this flag would
// be caught during flag.Parse().
return []bool{}
}
}
func setCompressorMode(val string) []string {
switch val {
case compModeNop, compModeGzip, compModeOff:
return []string{val}
case compModeAll:
return []string{compModeNop, compModeGzip, compModeOff}
default:
// This should never happen because a wrong value passed to this flag would
// be caught during flag.Parse().
return []string{}
}
}
func main() {
opts := processFlags()
before(opts)
s := stats.NewStats(numStatsBuckets)
featuresNum := makeFeaturesNum(opts)
sf := sharedFeatures(featuresNum)
var (
start = func(mode string, bf stats.Features) { s.StartRun(mode, bf, sf) }
stop = func(count uint64) { s.EndRun(count) }
ucStop = func(req uint64, resp uint64) { s.EndUnconstrainedRun(req, resp) }
)
for _, bf := range opts.generateFeatures(featuresNum) {
grpc.EnableTracing = bf.EnableTrace
if bf.EnableChannelz {
channelz.TurnOn()
}
if opts.rModes.unary {
unaryBenchmark(start, stop, bf, s)
}
if opts.rModes.streaming {
streamBenchmark(start, stop, bf, s)
}
if opts.rModes.unconstrained {
unconstrainedStreamBenchmark(start, ucStop, bf, s)
}
}
after(opts, s.GetResults())
}
func before(opts *benchOpts) {
if opts.memProfile != "" {
runtime.MemProfileRate = opts.memProfileRate
}
if opts.cpuProfile != "" {
f, err := os.Create(opts.cpuProfile)
if err != nil {
fmt.Fprintf(os.Stderr, "testing: %s\n", err)
return
}
if err := pprof.StartCPUProfile(f); err != nil {
fmt.Fprintf(os.Stderr, "testing: can't start cpu profile: %s\n", err)
f.Close()
return
}
}
}
func after(opts *benchOpts, data []stats.BenchResults) {
if opts.cpuProfile != "" {
pprof.StopCPUProfile() // flushes profile to disk
}
if opts.memProfile != "" {
f, err := os.Create(opts.memProfile)
if err != nil {
fmt.Fprintf(os.Stderr, "testing: %s\n", err)
os.Exit(2)
}
runtime.GC() // materialize all statistics
if err = pprof.WriteHeapProfile(f); err != nil {
fmt.Fprintf(os.Stderr, "testing: can't write heap profile %s: %s\n", opts.memProfile, err)
os.Exit(2)
}
f.Close()
}
if opts.benchmarkResultFile != "" {
f, err := os.Create(opts.benchmarkResultFile)
if err != nil {
log.Fatalf("testing: can't write benchmark result %s: %s\n", opts.benchmarkResultFile, err)
}
dataEncoder := gob.NewEncoder(f)
dataEncoder.Encode(data)
f.Close()
}
}
// nopCompressor is a compressor that just copies data.
type nopCompressor struct{}
func (nopCompressor) Do(w io.Writer, p []byte) error {
n, err := w.Write(p)
if err != nil {
return err
}
if n != len(p) {
return fmt.Errorf("nopCompressor.Write: wrote %v bytes; want %v", n, len(p))
}
return nil
}
func (nopCompressor) Type() string { return compModeNop }
// nopDecompressor is a decompressor that just copies data.
type nopDecompressor struct{}
func (nopDecompressor) Do(r io.Reader) ([]byte, error) { return ioutil.ReadAll(r) }
func (nopDecompressor) Type() string { return compModeNop }