blob: e58a464527579d9f070479fe358e19c84c582bcd [file] [log] [blame]
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package observability
import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"sync"
"testing"
"time"
"go.opencensus.io/stats/view"
"go.opencensus.io/trace"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/leakcheck"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/test/grpc_testing"
)
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
func init() {
// OpenCensus, once included in binary, will spawn a global goroutine
// recorder that is not controllable by application.
// https://github.com/census-instrumentation/opencensus-go/issues/1191
leakcheck.RegisterIgnoreGoroutine("go.opencensus.io/stats/view.(*worker).start")
// google-cloud-go leaks HTTP client. They are aware of this:
// https://github.com/googleapis/google-cloud-go/issues/1183
leakcheck.RegisterIgnoreGoroutine("internal/poll.runtime_pollWait")
}
var (
defaultTestTimeout = 10 * time.Second
testHeaderMetadata = metadata.MD{"header": []string{"HeADer"}}
testTrailerMetadata = metadata.MD{"trailer": []string{"TrAileR"}}
testOkPayload = []byte{72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100}
testErrorPayload = []byte{77, 97, 114, 116, 104, 97}
testErrorMessage = "test case injected error"
infinitySizeBytes int32 = 1024 * 1024 * 1024
defaultRequestCount = 24
)
const (
TypeOpenCensusViewDistribution string = "distribution"
TypeOpenCensusViewCount = "count"
TypeOpenCensusViewSum = "sum"
TypeOpenCensusViewLastValue = "last_value"
)
type fakeOpenCensusExporter struct {
// The map of the observed View name and type
SeenViews map[string]string
// Number of spans
SeenSpans int
t *testing.T
mu sync.RWMutex
}
func (fe *fakeOpenCensusExporter) ExportView(vd *view.Data) {
fe.mu.Lock()
defer fe.mu.Unlock()
for _, row := range vd.Rows {
fe.t.Logf("Metrics[%s]", vd.View.Name)
switch row.Data.(type) {
case *view.DistributionData:
fe.SeenViews[vd.View.Name] = TypeOpenCensusViewDistribution
case *view.CountData:
fe.SeenViews[vd.View.Name] = TypeOpenCensusViewCount
case *view.SumData:
fe.SeenViews[vd.View.Name] = TypeOpenCensusViewSum
case *view.LastValueData:
fe.SeenViews[vd.View.Name] = TypeOpenCensusViewLastValue
}
}
}
func (fe *fakeOpenCensusExporter) ExportSpan(vd *trace.SpanData) {
fe.mu.Lock()
defer fe.mu.Unlock()
fe.SeenSpans++
fe.t.Logf("Span[%v]", vd.Name)
}
func (fe *fakeOpenCensusExporter) Flush() {}
func (fe *fakeOpenCensusExporter) Close() error {
return nil
}
func (s) TestRefuseStartWithInvalidPatterns(t *testing.T) {
invalidConfig := &config{
ProjectID: "fake",
CloudLogging: &cloudLogging{
ClientRPCEvents: []clientRPCEvents{
{
Methods: []string{":-)"},
MaxMetadataBytes: 30,
MaxMessageBytes: 30,
},
},
},
}
invalidConfigJSON, err := json.Marshal(invalidConfig)
if err != nil {
t.Fatalf("failed to convert config to JSON: %v", err)
}
oldObservabilityConfig := envconfig.ObservabilityConfig
oldObservabilityConfigFile := envconfig.ObservabilityConfigFile
envconfig.ObservabilityConfig = string(invalidConfigJSON)
envconfig.ObservabilityConfigFile = ""
defer func() {
envconfig.ObservabilityConfig = oldObservabilityConfig
envconfig.ObservabilityConfigFile = oldObservabilityConfigFile
}()
// If there is at least one invalid pattern, which should not be silently tolerated.
if err := Start(context.Background()); err == nil {
t.Fatalf("Invalid patterns not triggering error")
}
}
// TestRefuseStartWithExcludeAndWildCardAll tests the sceanrio where an
// observability configuration is provided with client RPC event specifying to
// exclude, and which matches on the '*' wildcard (any). This should cause an
// error when trying to start the observability system.
func (s) TestRefuseStartWithExcludeAndWildCardAll(t *testing.T) {
invalidConfig := &config{
ProjectID: "fake",
CloudLogging: &cloudLogging{
ClientRPCEvents: []clientRPCEvents{
{
Methods: []string{"*"},
Exclude: true,
MaxMetadataBytes: 30,
MaxMessageBytes: 30,
},
},
},
}
invalidConfigJSON, err := json.Marshal(invalidConfig)
if err != nil {
t.Fatalf("failed to convert config to JSON: %v", err)
}
oldObservabilityConfig := envconfig.ObservabilityConfig
oldObservabilityConfigFile := envconfig.ObservabilityConfigFile
envconfig.ObservabilityConfig = string(invalidConfigJSON)
envconfig.ObservabilityConfigFile = ""
defer func() {
envconfig.ObservabilityConfig = oldObservabilityConfig
envconfig.ObservabilityConfigFile = oldObservabilityConfigFile
}()
// If there is at least one invalid pattern, which should not be silently tolerated.
if err := Start(context.Background()); err == nil {
t.Fatalf("Invalid patterns not triggering error")
}
}
// createTmpConfigInFileSystem creates a random observability config at a random
// place in the temporary portion of the file system dependent on system. It
// also sets the environment variable GRPC_CONFIG_OBSERVABILITY_JSON to point to
// this created config.
func createTmpConfigInFileSystem(rawJSON string) (func(), error) {
configJSONFile, err := os.CreateTemp(os.TempDir(), "configJSON-")
if err != nil {
return nil, fmt.Errorf("cannot create file %v: %v", configJSONFile.Name(), err)
}
_, err = configJSONFile.Write(json.RawMessage(rawJSON))
if err != nil {
return nil, fmt.Errorf("cannot write marshalled JSON: %v", err)
}
oldObservabilityConfigFile := envconfig.ObservabilityConfigFile
envconfig.ObservabilityConfigFile = configJSONFile.Name()
return func() {
configJSONFile.Close()
envconfig.ObservabilityConfigFile = oldObservabilityConfigFile
}, nil
}
// TestJSONEnvVarSet tests a valid observability configuration specified by the
// GRPC_CONFIG_OBSERVABILITY_JSON environment variable, whose value represents a
// file path pointing to a JSON encoded config.
func (s) TestJSONEnvVarSet(t *testing.T) {
configJSON := `{
"project_id": "fake"
}`
cleanup, err := createTmpConfigInFileSystem(configJSON)
defer cleanup()
if err != nil {
t.Fatalf("failed to create config in file system: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := Start(ctx); err != nil {
t.Fatalf("error starting observability with valid config through file system: %v", err)
}
defer End()
}
// TestBothConfigEnvVarsSet tests the scenario where both configuration
// environment variables are set. The file system environment variable should
// take precedence, and an error should return in the case of the file system
// configuration being invalid, even if the direct configuration environment
// variable is set and valid.
func (s) TestBothConfigEnvVarsSet(t *testing.T) {
invalidConfig := &config{
ProjectID: "fake",
CloudLogging: &cloudLogging{
ClientRPCEvents: []clientRPCEvents{
{
Methods: []string{":-)"},
MaxMetadataBytes: 30,
MaxMessageBytes: 30,
},
},
},
}
invalidConfigJSON, err := json.Marshal(invalidConfig)
if err != nil {
t.Fatalf("failed to convert config to JSON: %v", err)
}
cleanup, err := createTmpConfigInFileSystem(string(invalidConfigJSON))
defer cleanup()
if err != nil {
t.Fatalf("failed to create config in file system: %v", err)
}
// This configuration should be ignored, as precedence 2.
validConfig := &config{
ProjectID: "fake",
CloudLogging: &cloudLogging{
ClientRPCEvents: []clientRPCEvents{
{
Methods: []string{"*"},
MaxMetadataBytes: 30,
MaxMessageBytes: 30,
},
},
},
}
validConfigJSON, err := json.Marshal(validConfig)
if err != nil {
t.Fatalf("failed to convert config to JSON: %v", err)
}
oldObservabilityConfig := envconfig.ObservabilityConfig
envconfig.ObservabilityConfig = string(validConfigJSON)
defer func() {
envconfig.ObservabilityConfig = oldObservabilityConfig
}()
if err := Start(context.Background()); err == nil {
t.Fatalf("Invalid patterns not triggering error")
}
}
// TestErrInFileSystemEnvVar tests the scenario where an observability
// configuration is specified with environment variable that specifies a
// location in the file system for configuration, and this location doesn't have
// a file (or valid configuration).
func (s) TestErrInFileSystemEnvVar(t *testing.T) {
oldObservabilityConfigFile := envconfig.ObservabilityConfigFile
envconfig.ObservabilityConfigFile = "/this-file/does-not-exist"
defer func() {
envconfig.ObservabilityConfigFile = oldObservabilityConfigFile
}()
if err := Start(context.Background()); err == nil {
t.Fatalf("Invalid file system path not triggering error")
}
}
func (s) TestNoEnvSet(t *testing.T) {
oldObservabilityConfig := envconfig.ObservabilityConfig
oldObservabilityConfigFile := envconfig.ObservabilityConfigFile
envconfig.ObservabilityConfig = ""
envconfig.ObservabilityConfigFile = ""
defer func() {
envconfig.ObservabilityConfig = oldObservabilityConfig
envconfig.ObservabilityConfigFile = oldObservabilityConfigFile
}()
// If there is no observability config set at all, the Start should return an error.
if err := Start(context.Background()); err == nil {
t.Fatalf("Invalid patterns not triggering error")
}
}
func (s) TestOpenCensusIntegration(t *testing.T) {
defaultMetricsReportingInterval = time.Millisecond * 100
fe := &fakeOpenCensusExporter{SeenViews: make(map[string]string), t: t}
defer func(ne func(config *config) (tracingMetricsExporter, error)) {
newExporter = ne
}(newExporter)
newExporter = func(config *config) (tracingMetricsExporter, error) {
return fe, nil
}
openCensusOnConfig := &config{
ProjectID: "fake",
CloudMonitoring: &cloudMonitoring{},
CloudTrace: &cloudTrace{
SamplingRate: 1.0,
},
}
cleanup, err := setupObservabilitySystemWithConfig(openCensusOnConfig)
if err != nil {
t.Fatalf("error setting up observability %v", err)
}
defer cleanup()
ss := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, in *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
return &grpc_testing.SimpleResponse{}, nil
},
FullDuplexCallF: func(stream grpc_testing.TestService_FullDuplexCallServer) error {
for {
_, err := stream.Recv()
if err == io.EOF {
return nil
}
}
},
}
if err := ss.Start(nil); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
for i := 0; i < defaultRequestCount; i++ {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := ss.Client.UnaryCall(ctx, &grpc_testing.SimpleRequest{Payload: &grpc_testing.Payload{Body: testOkPayload}}); err != nil {
t.Fatalf("Unexpected error from UnaryCall: %v", err)
}
}
t.Logf("unary call passed count=%v", defaultRequestCount)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
stream, err := ss.Client.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("ss.Client.FullDuplexCall failed: %f", err)
}
stream.CloseSend()
if _, err = stream.Recv(); err != io.EOF {
t.Fatalf("unexpected error: %v, expected an EOF error", err)
}
var errs []error
for ctx.Err() == nil {
errs = nil
fe.mu.RLock()
if value := fe.SeenViews["grpc.io/client/started_rpcs"]; value != TypeOpenCensusViewCount {
errs = append(errs, fmt.Errorf("unexpected type for grpc.io/client/started_rpcs: %s != %s", value, TypeOpenCensusViewCount))
}
if value := fe.SeenViews["grpc.io/server/started_rpcs"]; value != TypeOpenCensusViewCount {
errs = append(errs, fmt.Errorf("unexpected type for grpc.io/server/started_rpcs: %s != %s", value, TypeOpenCensusViewCount))
}
if value := fe.SeenViews["grpc.io/client/completed_rpcs"]; value != TypeOpenCensusViewCount {
errs = append(errs, fmt.Errorf("unexpected type for grpc.io/client/completed_rpcs: %s != %s", value, TypeOpenCensusViewCount))
}
if value := fe.SeenViews["grpc.io/server/completed_rpcs"]; value != TypeOpenCensusViewCount {
errs = append(errs, fmt.Errorf("unexpected type for grpc.io/server/completed_rpcs: %s != %s", value, TypeOpenCensusViewCount))
}
if fe.SeenSpans <= 0 {
errs = append(errs, fmt.Errorf("unexpected number of seen spans: %v <= 0", fe.SeenSpans))
}
fe.mu.RUnlock()
if len(errs) == 0 {
break
}
time.Sleep(100 * time.Millisecond)
}
if len(errs) != 0 {
t.Fatalf("Invalid OpenCensus export data: %v", errs)
}
}
// TestCustomTagsTracingMetrics verifies that the custom tags defined in our
// observability configuration and set to two hardcoded values are passed to the
// function to create an exporter.
func (s) TestCustomTagsTracingMetrics(t *testing.T) {
defer func(ne func(config *config) (tracingMetricsExporter, error)) {
newExporter = ne
}(newExporter)
fe := &fakeOpenCensusExporter{SeenViews: make(map[string]string), t: t}
newExporter = func(config *config) (tracingMetricsExporter, error) {
ct := config.Labels
if len(ct) < 1 {
t.Fatalf("less than 2 custom tags sent in")
}
if val, ok := ct["customtag1"]; !ok || val != "wow" {
t.Fatalf("incorrect custom tag: got %v, want %v", val, "wow")
}
if val, ok := ct["customtag2"]; !ok || val != "nice" {
t.Fatalf("incorrect custom tag: got %v, want %v", val, "nice")
}
return fe, nil
}
// This configuration present in file system and it's defined custom tags should make it
// to the created exporter.
configJSON := `{
"project_id": "fake",
"cloud_trace": {},
"cloud_monitoring": {"sampling_rate": 1.0},
"labels":{"customtag1":"wow","customtag2":"nice"}
}`
cleanup, err := createTmpConfigInFileSystem(configJSON)
defer cleanup()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
err = Start(ctx)
defer End()
if err != nil {
t.Fatalf("Start() failed with err: %v", err)
}
}
// TestStartErrorsThenEnd tests that an End call after Start errors works
// without problems, as this is a possible codepath in the public observability
// API.
func (s) TestStartErrorsThenEnd(t *testing.T) {
invalidConfig := &config{
ProjectID: "fake",
CloudLogging: &cloudLogging{
ClientRPCEvents: []clientRPCEvents{
{
Methods: []string{":-)"},
MaxMetadataBytes: 30,
MaxMessageBytes: 30,
},
},
},
}
invalidConfigJSON, err := json.Marshal(invalidConfig)
if err != nil {
t.Fatalf("failed to convert config to JSON: %v", err)
}
oldObservabilityConfig := envconfig.ObservabilityConfig
oldObservabilityConfigFile := envconfig.ObservabilityConfigFile
envconfig.ObservabilityConfig = string(invalidConfigJSON)
envconfig.ObservabilityConfigFile = ""
defer func() {
envconfig.ObservabilityConfig = oldObservabilityConfig
envconfig.ObservabilityConfigFile = oldObservabilityConfigFile
}()
if err := Start(context.Background()); err == nil {
t.Fatalf("Invalid patterns not triggering error")
}
End()
}