blob: 651ee1c23e73ed08c71cafb96dd15b5d9188f3e3 [file] [log] [blame]
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"crypto/tls"
"log"
"net/http"
"time"
"cloud.google.com/go/storage"
"golang.org/x/net/http2"
"google.golang.org/api/option"
htransport "google.golang.org/api/transport/http"
"google.golang.org/grpc"
)
// clientPool pools a number of Storage clients for use without blocking, ie.
// a client that is received through Get may still be in use by one or more other
// calls.
type clientPool struct {
clients []*storage.Client
clientQueue chan *storage.Client
}
// newClientPool initializes the pool with numClients clients initialized using
// the initializeClient func.
// Returns the client pool and a cleanup func to be called to close the pool.
func newClientPool(initializeClient func() (*storage.Client, error), numClients int) (*clientPool, func()) {
p := &clientPool{
clients: make([]*storage.Client, numClients),
clientQueue: make(chan *storage.Client, numClients),
}
for i := 0; i < numClients; i++ {
var err error
p.clients[i], err = initializeClient()
if err != nil {
log.Fatalf("initializeClient: %v", err)
}
// Fill the queue with clients as they are created
p.clientQueue <- p.clients[i]
}
return p, func() {
for _, c := range p.clients {
c.Close()
}
}
}
// Get rotates through clients. This means the work may not be evenly distributed,
// particularly if using varying object sizes.
// The rotation is not 100% deterministic (ie. clients may swap places in the
// queue) when using multiple workers.
func (p *clientPool) Get() *storage.Client {
client := <-p.clientQueue
// return client to queue so that it will be used again without blocking
p.clientQueue <- client
return client
}
var xmlClients, jsonClients, gRPCClients, nonBenchmarkingClients *clientPool
// initializeClientPools creates separate client pools for XML, JSON, and gRPC,
// and only creates those if required. For example, if the input parameter `api`
// is set to `JSON`, the JSON pool is initialized but not the XML or GRPC pools.
func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() {
var closeNonBenchmarking, closeXML, closeJSON, closeGRPC func()
nonBenchmarkingClients, closeNonBenchmarking = newClientPool(
func() (*storage.Client, error) {
return initializeHTTPClient(ctx, clientConfig{
writeBufferSize: useDefault,
readBufferSize: useDefault,
})
},
1,
)
// Init XML clients if necessary
if opts.api == mixedAPIs || opts.api == xmlAPI {
xmlClients, closeXML = newClientPool(
func() (*storage.Client, error) {
return initializeHTTPClient(ctx, clientConfig{
writeBufferSize: opts.writeBufferSize,
readBufferSize: opts.readBufferSize,
useJSON: false,
setGCSFuseOpts: opts.useGCSFuseConfig,
endpoint: opts.endpoint,
})
},
opts.numClients,
)
}
// Init JSON clients if necessary
// There is no XML implementation for uploads, so we also initialize JSON
// clients if given that value.
if opts.api == mixedAPIs || opts.api == jsonAPI || opts.api == xmlAPI {
jsonClients, closeJSON = newClientPool(
func() (*storage.Client, error) {
return initializeHTTPClient(ctx, clientConfig{
writeBufferSize: opts.writeBufferSize,
readBufferSize: opts.readBufferSize,
useJSON: true,
setGCSFuseOpts: opts.useGCSFuseConfig,
endpoint: opts.endpoint,
})
},
opts.numClients,
)
}
// Init GRPC clients if necessary
if opts.api == mixedAPIs || opts.api == grpcAPI || opts.api == directPath {
gRPCClients, closeGRPC = newClientPool(
func() (*storage.Client, error) {
return initializeGRPCClient(context.Background(), clientConfig{
writeBufferSize: opts.writeBufferSize,
readBufferSize: opts.readBufferSize,
connectionPoolSize: opts.connPoolSize,
endpoint: opts.endpoint,
})
},
opts.numClients,
)
}
return func() {
closeNonBenchmarking()
if closeXML != nil {
closeXML()
}
if closeJSON != nil {
closeJSON()
}
if closeGRPC != nil {
closeGRPC()
}
}
}
// Rotate through clients. This may mean certain clients get a larger workload
// than others, if object sizes vary.
func getClient(ctx context.Context, api benchmarkAPI) *storage.Client {
switch api {
case grpcAPI, directPath:
return gRPCClients.Get()
case jsonAPI:
return jsonClients.Get()
case xmlAPI:
return xmlClients.Get()
}
return nil
}
// Client config
type clientConfig struct {
writeBufferSize, readBufferSize int
endpoint string
useJSON bool // only applicable to HTTP Clients
setGCSFuseOpts bool // only applicable to HTTP Clients
connectionPoolSize int // only applicable to GRPC Clients
}
func initializeHTTPClient(ctx context.Context, config clientConfig) (*storage.Client, error) {
opts := []option.ClientOption{}
if len(config.endpoint) > 0 {
opts = append(opts, option.WithEndpoint(config.endpoint))
}
if config.writeBufferSize != useDefault || config.readBufferSize != useDefault || config.setGCSFuseOpts {
// We need to modify the underlying HTTP client
base := http.DefaultTransport.(*http.Transport).Clone()
// Set MaxIdleConnsPerHost for parity with the Storage library, as it
// sets this as well
base.MaxIdleConnsPerHost = 100
if config.setGCSFuseOpts {
base = &http.Transport{
MaxConnsPerHost: 100,
MaxIdleConnsPerHost: 100,
// This disables HTTP/2 in transport.
TLSNextProto: make(
map[string]func(string, *tls.Conn) http.RoundTripper,
),
}
} else {
http2Trans, err := http2.ConfigureTransports(base)
if err == nil {
http2Trans.ReadIdleTimeout = time.Second * 31
}
}
base.WriteBufferSize = config.writeBufferSize
base.ReadBufferSize = config.readBufferSize
trans, err := htransport.NewTransport(ctx, base,
option.WithScopes("https://www.googleapis.com/auth/devstorage.full_control"))
if err != nil {
return nil, err
}
opts = append(opts, option.WithHTTPClient(&http.Client{Transport: trans}))
}
if config.useJSON {
opts = append(opts, storage.WithJSONReads())
}
// Init client
client, err := storage.NewClient(ctx, opts...)
return client, err
}
func initializeGRPCClient(ctx context.Context, config clientConfig) (*storage.Client, error) {
opts := []option.ClientOption{option.WithGRPCConnectionPool(config.connectionPoolSize)}
if len(config.endpoint) > 0 {
opts = append(opts, option.WithEndpoint(config.endpoint))
}
if config.writeBufferSize != useDefault {
opts = append(opts, option.WithGRPCDialOption(grpc.WithWriteBufferSize(config.writeBufferSize)))
}
if config.readBufferSize != useDefault {
opts = append(opts, option.WithGRPCDialOption(grpc.WithReadBufferSize(config.readBufferSize)))
}
client, err := storage.NewGRPCClient(ctx, opts...)
return client, err
}