blob: 6f6cde7d846ce0fedaa78ff71bcb9efdb3b9a4b5 [file] [log] [blame]
/*
*
* Copyright 2014 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package interop contains functions used by interop client/server.
package interop
import (
"bytes"
"context"
"fmt"
"io"
"os"
"strings"
"time"
"github.com/golang/protobuf/proto"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/grpc"
"google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)
var (
reqSizes = []int{27182, 8, 1828, 45904}
respSizes = []int{31415, 9, 2653, 58979}
largeReqSize = 271828
largeRespSize = 314159
initialMetadataKey = "x-grpc-test-echo-initial"
trailingMetadataKey = "x-grpc-test-echo-trailing-bin"
logger = grpclog.Component("interop")
)
// ClientNewPayload returns a payload of the given type and size.
func ClientNewPayload(t testpb.PayloadType, size int) *testpb.Payload {
if size < 0 {
logger.Fatalf("Requested a response with invalid length %d", size)
}
body := make([]byte, size)
switch t {
case testpb.PayloadType_COMPRESSABLE:
default:
logger.Fatalf("Unsupported payload type: %d", t)
}
return &testpb.Payload{
Type: t,
Body: body,
}
}
// DoEmptyUnaryCall performs a unary RPC with empty request and response messages.
func DoEmptyUnaryCall(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, args...)
if err != nil {
logger.Fatal("/TestService/EmptyCall RPC failed: ", err)
}
if !proto.Equal(&testpb.Empty{}, reply) {
logger.Fatalf("/TestService/EmptyCall receives %v, want %v", reply, testpb.Empty{})
}
}
// DoLargeUnaryCall performs a unary RPC with large payload in the request and response.
func DoLargeUnaryCall(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(largeRespSize),
Payload: pl,
}
reply, err := tc.UnaryCall(context.Background(), req, args...)
if err != nil {
logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
}
t := reply.GetPayload().GetType()
s := len(reply.GetPayload().GetBody())
if t != testpb.PayloadType_COMPRESSABLE || s != largeRespSize {
logger.Fatalf("Got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, largeRespSize)
}
}
// DoClientStreaming performs a client streaming RPC.
func DoClientStreaming(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
stream, err := tc.StreamingInputCall(context.Background(), args...)
if err != nil {
logger.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err)
}
var sum int
for _, s := range reqSizes {
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, s)
req := &testpb.StreamingInputCallRequest{
Payload: pl,
}
if err := stream.Send(req); err != nil {
logger.Fatalf("%v has error %v while sending %v", stream, err, req)
}
sum += s
}
reply, err := stream.CloseAndRecv()
if err != nil {
logger.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
if reply.GetAggregatedPayloadSize() != int32(sum) {
logger.Fatalf("%v.CloseAndRecv().GetAggregatePayloadSize() = %v; want %v", stream, reply.GetAggregatedPayloadSize(), sum)
}
}
// DoServerStreaming performs a server streaming RPC.
func DoServerStreaming(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
respParam := make([]*testpb.ResponseParameters, len(respSizes))
for i, s := range respSizes {
respParam[i] = &testpb.ResponseParameters{
Size: int32(s),
}
}
req := &testpb.StreamingOutputCallRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseParameters: respParam,
}
stream, err := tc.StreamingOutputCall(context.Background(), req, args...)
if err != nil {
logger.Fatalf("%v.StreamingOutputCall(_) = _, %v", tc, err)
}
var rpcStatus error
var respCnt int
var index int
for {
reply, err := stream.Recv()
if err != nil {
rpcStatus = err
break
}
t := reply.GetPayload().GetType()
if t != testpb.PayloadType_COMPRESSABLE {
logger.Fatalf("Got the reply of type %d, want %d", t, testpb.PayloadType_COMPRESSABLE)
}
size := len(reply.GetPayload().GetBody())
if size != respSizes[index] {
logger.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
}
index++
respCnt++
}
if rpcStatus != io.EOF {
logger.Fatalf("Failed to finish the server streaming rpc: %v", rpcStatus)
}
if respCnt != len(respSizes) {
logger.Fatalf("Got %d reply, want %d", len(respSizes), respCnt)
}
}
// DoPingPong performs ping-pong style bi-directional streaming RPC.
func DoPingPong(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
stream, err := tc.FullDuplexCall(context.Background(), args...)
if err != nil {
logger.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
}
var index int
for index < len(reqSizes) {
respParam := []*testpb.ResponseParameters{
{
Size: int32(respSizes[index]),
},
}
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, reqSizes[index])
req := &testpb.StreamingOutputCallRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseParameters: respParam,
Payload: pl,
}
if err := stream.Send(req); err != nil {
logger.Fatalf("%v has error %v while sending %v", stream, err, req)
}
reply, err := stream.Recv()
if err != nil {
logger.Fatalf("%v.Recv() = %v", stream, err)
}
t := reply.GetPayload().GetType()
if t != testpb.PayloadType_COMPRESSABLE {
logger.Fatalf("Got the reply of type %d, want %d", t, testpb.PayloadType_COMPRESSABLE)
}
size := len(reply.GetPayload().GetBody())
if size != respSizes[index] {
logger.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
}
index++
}
if err := stream.CloseSend(); err != nil {
logger.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
}
if _, err := stream.Recv(); err != io.EOF {
logger.Fatalf("%v failed to complele the ping pong test: %v", stream, err)
}
}
// DoEmptyStream sets up a bi-directional streaming with zero message.
func DoEmptyStream(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
stream, err := tc.FullDuplexCall(context.Background(), args...)
if err != nil {
logger.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
}
if err := stream.CloseSend(); err != nil {
logger.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
}
if _, err := stream.Recv(); err != io.EOF {
logger.Fatalf("%v failed to complete the empty stream test: %v", stream, err)
}
}
// DoTimeoutOnSleepingServer performs an RPC on a sleep server which causes RPC timeout.
func DoTimeoutOnSleepingServer(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
stream, err := tc.FullDuplexCall(ctx, args...)
if err != nil {
if status.Code(err) == codes.DeadlineExceeded {
return
}
logger.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
}
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 27182)
req := &testpb.StreamingOutputCallRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
Payload: pl,
}
if err := stream.Send(req); err != nil && err != io.EOF {
logger.Fatalf("%v.Send(_) = %v", stream, err)
}
if _, err := stream.Recv(); status.Code(err) != codes.DeadlineExceeded {
logger.Fatalf("%v.Recv() = _, %v, want error code %d", stream, err, codes.DeadlineExceeded)
}
}
// DoComputeEngineCreds performs a unary RPC with compute engine auth.
func DoComputeEngineCreds(tc testgrpc.TestServiceClient, serviceAccount, oauthScope string) {
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(largeRespSize),
Payload: pl,
FillUsername: true,
FillOauthScope: true,
}
reply, err := tc.UnaryCall(context.Background(), req)
if err != nil {
logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
}
user := reply.GetUsername()
scope := reply.GetOauthScope()
if user != serviceAccount {
logger.Fatalf("Got user name %q, want %q.", user, serviceAccount)
}
if !strings.Contains(oauthScope, scope) {
logger.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
}
}
func getServiceAccountJSONKey(keyFile string) []byte {
jsonKey, err := os.ReadFile(keyFile)
if err != nil {
logger.Fatalf("Failed to read the service account key file: %v", err)
}
return jsonKey
}
// DoServiceAccountCreds performs a unary RPC with service account auth.
func DoServiceAccountCreds(tc testgrpc.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(largeRespSize),
Payload: pl,
FillUsername: true,
FillOauthScope: true,
}
reply, err := tc.UnaryCall(context.Background(), req)
if err != nil {
logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
}
jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
user := reply.GetUsername()
scope := reply.GetOauthScope()
if !strings.Contains(string(jsonKey), user) {
logger.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
}
if !strings.Contains(oauthScope, scope) {
logger.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
}
}
// DoJWTTokenCreds performs a unary RPC with JWT token auth.
func DoJWTTokenCreds(tc testgrpc.TestServiceClient, serviceAccountKeyFile string) {
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(largeRespSize),
Payload: pl,
FillUsername: true,
}
reply, err := tc.UnaryCall(context.Background(), req)
if err != nil {
logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
}
jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
user := reply.GetUsername()
if !strings.Contains(string(jsonKey), user) {
logger.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
}
}
// GetToken obtains an OAUTH token from the input.
func GetToken(serviceAccountKeyFile string, oauthScope string) *oauth2.Token {
jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
config, err := google.JWTConfigFromJSON(jsonKey, oauthScope)
if err != nil {
logger.Fatalf("Failed to get the config: %v", err)
}
token, err := config.TokenSource(context.Background()).Token()
if err != nil {
logger.Fatalf("Failed to get the token: %v", err)
}
return token
}
// DoOauth2TokenCreds performs a unary RPC with OAUTH2 token auth.
func DoOauth2TokenCreds(tc testgrpc.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(largeRespSize),
Payload: pl,
FillUsername: true,
FillOauthScope: true,
}
reply, err := tc.UnaryCall(context.Background(), req)
if err != nil {
logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
}
jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
user := reply.GetUsername()
scope := reply.GetOauthScope()
if !strings.Contains(string(jsonKey), user) {
logger.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
}
if !strings.Contains(oauthScope, scope) {
logger.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
}
}
// DoPerRPCCreds performs a unary RPC with per RPC OAUTH2 token.
func DoPerRPCCreds(tc testgrpc.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(largeRespSize),
Payload: pl,
FillUsername: true,
FillOauthScope: true,
}
token := GetToken(serviceAccountKeyFile, oauthScope)
kv := map[string]string{"authorization": token.Type() + " " + token.AccessToken}
ctx := metadata.NewOutgoingContext(context.Background(), metadata.MD{"authorization": []string{kv["authorization"]}})
reply, err := tc.UnaryCall(ctx, req)
if err != nil {
logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
}
user := reply.GetUsername()
scope := reply.GetOauthScope()
if !strings.Contains(string(jsonKey), user) {
logger.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
}
if !strings.Contains(oauthScope, scope) {
logger.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
}
}
// DoGoogleDefaultCredentials performs an unary RPC with google default credentials
func DoGoogleDefaultCredentials(tc testgrpc.TestServiceClient, defaultServiceAccount string) {
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(largeRespSize),
Payload: pl,
FillUsername: true,
FillOauthScope: true,
}
reply, err := tc.UnaryCall(context.Background(), req)
if err != nil {
logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
}
if reply.GetUsername() != defaultServiceAccount {
logger.Fatalf("Got user name %q; wanted %q. ", reply.GetUsername(), defaultServiceAccount)
}
}
// DoComputeEngineChannelCredentials performs an unary RPC with compute engine channel credentials
func DoComputeEngineChannelCredentials(tc testgrpc.TestServiceClient, defaultServiceAccount string) {
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(largeRespSize),
Payload: pl,
FillUsername: true,
FillOauthScope: true,
}
reply, err := tc.UnaryCall(context.Background(), req)
if err != nil {
logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
}
if reply.GetUsername() != defaultServiceAccount {
logger.Fatalf("Got user name %q; wanted %q. ", reply.GetUsername(), defaultServiceAccount)
}
}
var testMetadata = metadata.MD{
"key1": []string{"value1"},
"key2": []string{"value2"},
}
// DoCancelAfterBegin cancels the RPC after metadata has been sent but before payloads are sent.
func DoCancelAfterBegin(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
ctx, cancel := context.WithCancel(metadata.NewOutgoingContext(context.Background(), testMetadata))
stream, err := tc.StreamingInputCall(ctx, args...)
if err != nil {
logger.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err)
}
cancel()
_, err = stream.CloseAndRecv()
if status.Code(err) != codes.Canceled {
logger.Fatalf("%v.CloseAndRecv() got error code %d, want %d", stream, status.Code(err), codes.Canceled)
}
}
// DoCancelAfterFirstResponse cancels the RPC after receiving the first message from the server.
func DoCancelAfterFirstResponse(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
ctx, cancel := context.WithCancel(context.Background())
stream, err := tc.FullDuplexCall(ctx, args...)
if err != nil {
logger.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
}
respParam := []*testpb.ResponseParameters{
{
Size: 31415,
},
}
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 27182)
req := &testpb.StreamingOutputCallRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseParameters: respParam,
Payload: pl,
}
if err := stream.Send(req); err != nil {
logger.Fatalf("%v has error %v while sending %v", stream, err, req)
}
if _, err := stream.Recv(); err != nil {
logger.Fatalf("%v.Recv() = %v", stream, err)
}
cancel()
if _, err := stream.Recv(); status.Code(err) != codes.Canceled {
logger.Fatalf("%v compleled with error code %d, want %d", stream, status.Code(err), codes.Canceled)
}
}
var (
initialMetadataValue = "test_initial_metadata_value"
trailingMetadataValue = "\x0a\x0b\x0a\x0b\x0a\x0b"
customMetadata = metadata.Pairs(
initialMetadataKey, initialMetadataValue,
trailingMetadataKey, trailingMetadataValue,
)
)
func validateMetadata(header, trailer metadata.MD) {
if len(header[initialMetadataKey]) != 1 {
logger.Fatalf("Expected exactly one header from server. Received %d", len(header[initialMetadataKey]))
}
if header[initialMetadataKey][0] != initialMetadataValue {
logger.Fatalf("Got header %s; want %s", header[initialMetadataKey][0], initialMetadataValue)
}
if len(trailer[trailingMetadataKey]) != 1 {
logger.Fatalf("Expected exactly one trailer from server. Received %d", len(trailer[trailingMetadataKey]))
}
if trailer[trailingMetadataKey][0] != trailingMetadataValue {
logger.Fatalf("Got trailer %s; want %s", trailer[trailingMetadataKey][0], trailingMetadataValue)
}
}
// DoCustomMetadata checks that metadata is echoed back to the client.
func DoCustomMetadata(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
// Testing with UnaryCall.
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 1)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(1),
Payload: pl,
}
ctx := metadata.NewOutgoingContext(context.Background(), customMetadata)
var header, trailer metadata.MD
args = append(args, grpc.Header(&header), grpc.Trailer(&trailer))
reply, err := tc.UnaryCall(
ctx,
req,
args...,
)
if err != nil {
logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
}
t := reply.GetPayload().GetType()
s := len(reply.GetPayload().GetBody())
if t != testpb.PayloadType_COMPRESSABLE || s != 1 {
logger.Fatalf("Got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, 1)
}
validateMetadata(header, trailer)
// Testing with FullDuplex.
stream, err := tc.FullDuplexCall(ctx, args...)
if err != nil {
logger.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
respParam := []*testpb.ResponseParameters{
{
Size: 1,
},
}
streamReq := &testpb.StreamingOutputCallRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseParameters: respParam,
Payload: pl,
}
if err := stream.Send(streamReq); err != nil {
logger.Fatalf("%v has error %v while sending %v", stream, err, streamReq)
}
streamHeader, err := stream.Header()
if err != nil {
logger.Fatalf("%v.Header() = %v", stream, err)
}
if _, err := stream.Recv(); err != nil {
logger.Fatalf("%v.Recv() = %v", stream, err)
}
if err := stream.CloseSend(); err != nil {
logger.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
}
if _, err := stream.Recv(); err != io.EOF {
logger.Fatalf("%v failed to complete the custom metadata test: %v", stream, err)
}
streamTrailer := stream.Trailer()
validateMetadata(streamHeader, streamTrailer)
}
// DoStatusCodeAndMessage checks that the status code is propagated back to the client.
func DoStatusCodeAndMessage(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
var code int32 = 2
msg := "test status message"
expectedErr := status.Error(codes.Code(code), msg)
respStatus := &testpb.EchoStatus{
Code: code,
Message: msg,
}
// Test UnaryCall.
req := &testpb.SimpleRequest{
ResponseStatus: respStatus,
}
if _, err := tc.UnaryCall(context.Background(), req, args...); err == nil || err.Error() != expectedErr.Error() {
logger.Fatalf("%v.UnaryCall(_, %v) = _, %v, want _, %v", tc, req, err, expectedErr)
}
// Test FullDuplexCall.
stream, err := tc.FullDuplexCall(context.Background(), args...)
if err != nil {
logger.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
streamReq := &testpb.StreamingOutputCallRequest{
ResponseStatus: respStatus,
}
if err := stream.Send(streamReq); err != nil {
logger.Fatalf("%v has error %v while sending %v, want <nil>", stream, err, streamReq)
}
if err := stream.CloseSend(); err != nil {
logger.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
}
if _, err = stream.Recv(); err.Error() != expectedErr.Error() {
logger.Fatalf("%v.Recv() returned error %v, want %v", stream, err, expectedErr)
}
}
// DoSpecialStatusMessage verifies Unicode and whitespace is correctly processed
// in status message.
func DoSpecialStatusMessage(tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
const (
code int32 = 2
msg string = "\t\ntest with whitespace\r\nand Unicode BMP ☺ and non-BMP 😈\t\n"
)
expectedErr := status.Error(codes.Code(code), msg)
req := &testpb.SimpleRequest{
ResponseStatus: &testpb.EchoStatus{
Code: code,
Message: msg,
},
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if _, err := tc.UnaryCall(ctx, req, args...); err == nil || err.Error() != expectedErr.Error() {
logger.Fatalf("%v.UnaryCall(_, %v) = _, %v, want _, %v", tc, req, err, expectedErr)
}
}
// DoUnimplementedService attempts to call a method from an unimplemented service.
func DoUnimplementedService(tc testgrpc.UnimplementedServiceClient) {
_, err := tc.UnimplementedCall(context.Background(), &testpb.Empty{})
if status.Code(err) != codes.Unimplemented {
logger.Fatalf("%v.UnimplementedCall() = _, %v, want _, %v", tc, status.Code(err), codes.Unimplemented)
}
}
// DoUnimplementedMethod attempts to call an unimplemented method.
func DoUnimplementedMethod(cc *grpc.ClientConn) {
var req, reply proto.Message
if err := cc.Invoke(context.Background(), "/grpc.testing.TestService/UnimplementedCall", req, reply); err == nil || status.Code(err) != codes.Unimplemented {
logger.Fatalf("ClientConn.Invoke(_, _, _, _, _) = %v, want error code %s", err, codes.Unimplemented)
}
}
// DoPickFirstUnary runs multiple RPCs (rpcCount) and checks that all requests
// are sent to the same backend.
func DoPickFirstUnary(tc testgrpc.TestServiceClient) {
const rpcCount = 100
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 1)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(1),
Payload: pl,
FillServerId: true,
}
// TODO(mohanli): Revert the timeout back to 10s once TD migrates to xdstp.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
var serverID string
for i := 0; i < rpcCount; i++ {
resp, err := tc.UnaryCall(ctx, req)
if err != nil {
logger.Fatalf("iteration %d, failed to do UnaryCall: %v", i, err)
}
id := resp.ServerId
if id == "" {
logger.Fatalf("iteration %d, got empty server ID", i)
}
if i == 0 {
serverID = id
continue
}
if serverID != id {
logger.Fatalf("iteration %d, got different server ids: %q vs %q", i, serverID, id)
}
}
}
func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, resetChannel bool, serverAddr string, dopts []grpc.DialOption, copts []grpc.CallOption) (latency time.Duration, err error) {
start := time.Now()
client := tc
if resetChannel {
var conn *grpc.ClientConn
conn, err = grpc.Dial(serverAddr, dopts...)
if err != nil {
return
}
defer conn.Close()
client = testgrpc.NewTestServiceClient(conn)
}
// per test spec, don't include channel shutdown in latency measurement
defer func() { latency = time.Since(start) }()
// do a large-unary RPC
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(largeRespSize),
Payload: pl,
}
var reply *testpb.SimpleResponse
reply, err = client.UnaryCall(ctx, req, copts...)
if err != nil {
err = fmt.Errorf("/TestService/UnaryCall RPC failed: %s", err)
return
}
t := reply.GetPayload().GetType()
s := len(reply.GetPayload().GetBody())
if t != testpb.PayloadType_COMPRESSABLE || s != largeRespSize {
err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, largeRespSize)
return
}
return
}
// DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds.
// If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new
// stub that is created with the provided server address and dial options.
func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.DialOption, resetChannel bool, soakIterations int, maxFailures int, perIterationMaxAcceptableLatency time.Duration, minTimeBetweenRPCs time.Duration, overallDeadline time.Time) {
start := time.Now()
ctx, cancel := context.WithDeadline(context.Background(), overallDeadline)
defer cancel()
iterationsDone := 0
totalFailures := 0
hopts := stats.HistogramOptions{
NumBuckets: 20,
GrowthFactor: 1,
BaseBucketSize: 1,
MinValue: 0,
}
h := stats.NewHistogram(hopts)
for i := 0; i < soakIterations; i++ {
if time.Now().After(overallDeadline) {
break
}
earliestNextStart := time.After(minTimeBetweenRPCs)
iterationsDone++
var p peer.Peer
latency, err := doOneSoakIteration(ctx, tc, resetChannel, serverAddr, dopts, []grpc.CallOption{grpc.Peer(&p)})
latencyMs := int64(latency / time.Millisecond)
h.Add(latencyMs)
if err != nil {
totalFailures++
addrStr := "nil"
if p.Addr != nil {
addrStr = p.Addr.String()
}
fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s failed: %s\n", i, latencyMs, addrStr, serverAddr, err)
<-earliestNextStart
continue
}
if latency > perIterationMaxAcceptableLatency {
totalFailures++
fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s exceeds max acceptable latency: %d\n", i, latencyMs, p.Addr.String(), serverAddr, perIterationMaxAcceptableLatency.Milliseconds())
<-earliestNextStart
continue
}
fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s succeeded\n", i, latencyMs, p.Addr.String(), serverAddr)
<-earliestNextStart
}
var b bytes.Buffer
h.Print(&b)
fmt.Fprintf(os.Stderr, "(server_uri: %s) histogram of per-iteration latencies in milliseconds: %s\n", serverAddr, b.String())
fmt.Fprintf(os.Stderr, "(server_uri: %s) soak test ran: %d / %d iterations. total failures: %d. max failures threshold: %d. See breakdown above for which iterations succeeded, failed, and why for more info.\n", serverAddr, iterationsDone, soakIterations, totalFailures, maxFailures)
if iterationsDone < soakIterations {
logger.Fatalf("(server_uri: %s) soak test consumed all %f seconds of time and quit early, only having ran %d out of desired %d iterations.", serverAddr, overallDeadline.Sub(start).Seconds(), iterationsDone, soakIterations)
}
if totalFailures > maxFailures {
logger.Fatalf("(server_uri: %s) soak test total failures: %d exceeds max failures threshold: %d.", serverAddr, totalFailures, maxFailures)
}
}
type testServer struct {
testgrpc.UnimplementedTestServiceServer
}
// NewTestServer creates a test server for test service.
func NewTestServer() testgrpc.TestServiceServer {
return &testServer{}
}
func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return new(testpb.Empty), nil
}
func serverNewPayload(t testpb.PayloadType, size int32) (*testpb.Payload, error) {
if size < 0 {
return nil, fmt.Errorf("requested a response with invalid length %d", size)
}
body := make([]byte, size)
switch t {
case testpb.PayloadType_COMPRESSABLE:
default:
return nil, fmt.Errorf("unsupported payload type: %d", t)
}
return &testpb.Payload{
Type: t,
Body: body,
}, nil
}
func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
st := in.GetResponseStatus()
if md, ok := metadata.FromIncomingContext(ctx); ok {
if initialMetadata, ok := md[initialMetadataKey]; ok {
header := metadata.Pairs(initialMetadataKey, initialMetadata[0])
grpc.SendHeader(ctx, header)
}
if trailingMetadata, ok := md[trailingMetadataKey]; ok {
trailer := metadata.Pairs(trailingMetadataKey, trailingMetadata[0])
grpc.SetTrailer(ctx, trailer)
}
}
if st != nil && st.Code != 0 {
return nil, status.Error(codes.Code(st.Code), st.Message)
}
pl, err := serverNewPayload(in.GetResponseType(), in.GetResponseSize())
if err != nil {
return nil, err
}
return &testpb.SimpleResponse{
Payload: pl,
}, nil
}
func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error {
cs := args.GetResponseParameters()
for _, c := range cs {
if us := c.GetIntervalUs(); us > 0 {
time.Sleep(time.Duration(us) * time.Microsecond)
}
pl, err := serverNewPayload(args.GetResponseType(), c.GetSize())
if err != nil {
return err
}
if err := stream.Send(&testpb.StreamingOutputCallResponse{
Payload: pl,
}); err != nil {
return err
}
}
return nil
}
func (s *testServer) StreamingInputCall(stream testgrpc.TestService_StreamingInputCallServer) error {
var sum int
for {
in, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&testpb.StreamingInputCallResponse{
AggregatedPayloadSize: int32(sum),
})
}
if err != nil {
return err
}
p := in.GetPayload().GetBody()
sum += len(p)
}
}
func (s *testServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error {
if md, ok := metadata.FromIncomingContext(stream.Context()); ok {
if initialMetadata, ok := md[initialMetadataKey]; ok {
header := metadata.Pairs(initialMetadataKey, initialMetadata[0])
stream.SendHeader(header)
}
if trailingMetadata, ok := md[trailingMetadataKey]; ok {
trailer := metadata.Pairs(trailingMetadataKey, trailingMetadata[0])
stream.SetTrailer(trailer)
}
}
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
return nil
}
if err != nil {
return err
}
st := in.GetResponseStatus()
if st != nil && st.Code != 0 {
return status.Error(codes.Code(st.Code), st.Message)
}
cs := in.GetResponseParameters()
for _, c := range cs {
if us := c.GetIntervalUs(); us > 0 {
time.Sleep(time.Duration(us) * time.Microsecond)
}
pl, err := serverNewPayload(in.GetResponseType(), c.GetSize())
if err != nil {
return err
}
if err := stream.Send(&testpb.StreamingOutputCallResponse{
Payload: pl,
}); err != nil {
return err
}
}
}
}
func (s *testServer) HalfDuplexCall(stream testgrpc.TestService_HalfDuplexCallServer) error {
var msgBuf []*testpb.StreamingOutputCallRequest
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
break
}
if err != nil {
return err
}
msgBuf = append(msgBuf, in)
}
for _, m := range msgBuf {
cs := m.GetResponseParameters()
for _, c := range cs {
if us := c.GetIntervalUs(); us > 0 {
time.Sleep(time.Duration(us) * time.Microsecond)
}
pl, err := serverNewPayload(m.GetResponseType(), c.GetSize())
if err != nil {
return err
}
if err := stream.Send(&testpb.StreamingOutputCallResponse{
Payload: pl,
}); err != nil {
return err
}
}
}
return nil
}