blob: e58a464527579d9f070479fe358e19c84c582bcd [file] [log] [blame]
* Copyright 2022 gRPC authors.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package observability
import (
type s struct {
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
func init() {
// OpenCensus, once included in binary, will spawn a global goroutine
// recorder that is not controllable by application.
// google-cloud-go leaks HTTP client. They are aware of this:
var (
defaultTestTimeout = 10 * time.Second
testHeaderMetadata = metadata.MD{"header": []string{"HeADer"}}
testTrailerMetadata = metadata.MD{"trailer": []string{"TrAileR"}}
testOkPayload = []byte{72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100}
testErrorPayload = []byte{77, 97, 114, 116, 104, 97}
testErrorMessage = "test case injected error"
infinitySizeBytes int32 = 1024 * 1024 * 1024
defaultRequestCount = 24
const (
TypeOpenCensusViewDistribution string = "distribution"
TypeOpenCensusViewCount = "count"
TypeOpenCensusViewSum = "sum"
TypeOpenCensusViewLastValue = "last_value"
type fakeOpenCensusExporter struct {
// The map of the observed View name and type
SeenViews map[string]string
// Number of spans
SeenSpans int
t *testing.T
mu sync.RWMutex
func (fe *fakeOpenCensusExporter) ExportView(vd *view.Data) {
for _, row := range vd.Rows {
fe.t.Logf("Metrics[%s]", vd.View.Name)
switch row.Data.(type) {
case *view.DistributionData:
fe.SeenViews[vd.View.Name] = TypeOpenCensusViewDistribution
case *view.CountData:
fe.SeenViews[vd.View.Name] = TypeOpenCensusViewCount
case *view.SumData:
fe.SeenViews[vd.View.Name] = TypeOpenCensusViewSum
case *view.LastValueData:
fe.SeenViews[vd.View.Name] = TypeOpenCensusViewLastValue
func (fe *fakeOpenCensusExporter) ExportSpan(vd *trace.SpanData) {
fe.t.Logf("Span[%v]", vd.Name)
func (fe *fakeOpenCensusExporter) Flush() {}
func (fe *fakeOpenCensusExporter) Close() error {
return nil
func (s) TestRefuseStartWithInvalidPatterns(t *testing.T) {
invalidConfig := &config{
ProjectID: "fake",
CloudLogging: &cloudLogging{
ClientRPCEvents: []clientRPCEvents{
Methods: []string{":-)"},
MaxMetadataBytes: 30,
MaxMessageBytes: 30,
invalidConfigJSON, err := json.Marshal(invalidConfig)
if err != nil {
t.Fatalf("failed to convert config to JSON: %v", err)
oldObservabilityConfig := envconfig.ObservabilityConfig
oldObservabilityConfigFile := envconfig.ObservabilityConfigFile
envconfig.ObservabilityConfig = string(invalidConfigJSON)
envconfig.ObservabilityConfigFile = ""
defer func() {
envconfig.ObservabilityConfig = oldObservabilityConfig
envconfig.ObservabilityConfigFile = oldObservabilityConfigFile
// If there is at least one invalid pattern, which should not be silently tolerated.
if err := Start(context.Background()); err == nil {
t.Fatalf("Invalid patterns not triggering error")
// TestRefuseStartWithExcludeAndWildCardAll tests the sceanrio where an
// observability configuration is provided with client RPC event specifying to
// exclude, and which matches on the '*' wildcard (any). This should cause an
// error when trying to start the observability system.
func (s) TestRefuseStartWithExcludeAndWildCardAll(t *testing.T) {
invalidConfig := &config{
ProjectID: "fake",
CloudLogging: &cloudLogging{
ClientRPCEvents: []clientRPCEvents{
Methods: []string{"*"},
Exclude: true,
MaxMetadataBytes: 30,
MaxMessageBytes: 30,
invalidConfigJSON, err := json.Marshal(invalidConfig)
if err != nil {
t.Fatalf("failed to convert config to JSON: %v", err)
oldObservabilityConfig := envconfig.ObservabilityConfig
oldObservabilityConfigFile := envconfig.ObservabilityConfigFile
envconfig.ObservabilityConfig = string(invalidConfigJSON)
envconfig.ObservabilityConfigFile = ""
defer func() {
envconfig.ObservabilityConfig = oldObservabilityConfig
envconfig.ObservabilityConfigFile = oldObservabilityConfigFile
// If there is at least one invalid pattern, which should not be silently tolerated.
if err := Start(context.Background()); err == nil {
t.Fatalf("Invalid patterns not triggering error")
// createTmpConfigInFileSystem creates a random observability config at a random
// place in the temporary portion of the file system dependent on system. It
// also sets the environment variable GRPC_CONFIG_OBSERVABILITY_JSON to point to
// this created config.
func createTmpConfigInFileSystem(rawJSON string) (func(), error) {
configJSONFile, err := os.CreateTemp(os.TempDir(), "configJSON-")
if err != nil {
return nil, fmt.Errorf("cannot create file %v: %v", configJSONFile.Name(), err)
_, err = configJSONFile.Write(json.RawMessage(rawJSON))
if err != nil {
return nil, fmt.Errorf("cannot write marshalled JSON: %v", err)
oldObservabilityConfigFile := envconfig.ObservabilityConfigFile
envconfig.ObservabilityConfigFile = configJSONFile.Name()
return func() {
envconfig.ObservabilityConfigFile = oldObservabilityConfigFile
}, nil
// TestJSONEnvVarSet tests a valid observability configuration specified by the
// GRPC_CONFIG_OBSERVABILITY_JSON environment variable, whose value represents a
// file path pointing to a JSON encoded config.
func (s) TestJSONEnvVarSet(t *testing.T) {
configJSON := `{
"project_id": "fake"
cleanup, err := createTmpConfigInFileSystem(configJSON)
defer cleanup()
if err != nil {
t.Fatalf("failed to create config in file system: %v", err)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := Start(ctx); err != nil {
t.Fatalf("error starting observability with valid config through file system: %v", err)
defer End()
// TestBothConfigEnvVarsSet tests the scenario where both configuration
// environment variables are set. The file system environment variable should
// take precedence, and an error should return in the case of the file system
// configuration being invalid, even if the direct configuration environment
// variable is set and valid.
func (s) TestBothConfigEnvVarsSet(t *testing.T) {
invalidConfig := &config{
ProjectID: "fake",
CloudLogging: &cloudLogging{
ClientRPCEvents: []clientRPCEvents{
Methods: []string{":-)"},
MaxMetadataBytes: 30,
MaxMessageBytes: 30,
invalidConfigJSON, err := json.Marshal(invalidConfig)
if err != nil {
t.Fatalf("failed to convert config to JSON: %v", err)
cleanup, err := createTmpConfigInFileSystem(string(invalidConfigJSON))
defer cleanup()
if err != nil {
t.Fatalf("failed to create config in file system: %v", err)
// This configuration should be ignored, as precedence 2.
validConfig := &config{
ProjectID: "fake",
CloudLogging: &cloudLogging{
ClientRPCEvents: []clientRPCEvents{
Methods: []string{"*"},
MaxMetadataBytes: 30,
MaxMessageBytes: 30,
validConfigJSON, err := json.Marshal(validConfig)
if err != nil {
t.Fatalf("failed to convert config to JSON: %v", err)
oldObservabilityConfig := envconfig.ObservabilityConfig
envconfig.ObservabilityConfig = string(validConfigJSON)
defer func() {
envconfig.ObservabilityConfig = oldObservabilityConfig
if err := Start(context.Background()); err == nil {
t.Fatalf("Invalid patterns not triggering error")
// TestErrInFileSystemEnvVar tests the scenario where an observability
// configuration is specified with environment variable that specifies a
// location in the file system for configuration, and this location doesn't have
// a file (or valid configuration).
func (s) TestErrInFileSystemEnvVar(t *testing.T) {
oldObservabilityConfigFile := envconfig.ObservabilityConfigFile
envconfig.ObservabilityConfigFile = "/this-file/does-not-exist"
defer func() {
envconfig.ObservabilityConfigFile = oldObservabilityConfigFile
if err := Start(context.Background()); err == nil {
t.Fatalf("Invalid file system path not triggering error")
func (s) TestNoEnvSet(t *testing.T) {
oldObservabilityConfig := envconfig.ObservabilityConfig
oldObservabilityConfigFile := envconfig.ObservabilityConfigFile
envconfig.ObservabilityConfig = ""
envconfig.ObservabilityConfigFile = ""
defer func() {
envconfig.ObservabilityConfig = oldObservabilityConfig
envconfig.ObservabilityConfigFile = oldObservabilityConfigFile
// If there is no observability config set at all, the Start should return an error.
if err := Start(context.Background()); err == nil {
t.Fatalf("Invalid patterns not triggering error")
func (s) TestOpenCensusIntegration(t *testing.T) {
defaultMetricsReportingInterval = time.Millisecond * 100
fe := &fakeOpenCensusExporter{SeenViews: make(map[string]string), t: t}
defer func(ne func(config *config) (tracingMetricsExporter, error)) {
newExporter = ne
newExporter = func(config *config) (tracingMetricsExporter, error) {
return fe, nil
openCensusOnConfig := &config{
ProjectID: "fake",
CloudMonitoring: &cloudMonitoring{},
CloudTrace: &cloudTrace{
SamplingRate: 1.0,
cleanup, err := setupObservabilitySystemWithConfig(openCensusOnConfig)
if err != nil {
t.Fatalf("error setting up observability %v", err)
defer cleanup()
ss := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, in *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
return &grpc_testing.SimpleResponse{}, nil
FullDuplexCallF: func(stream grpc_testing.TestService_FullDuplexCallServer) error {
for {
_, err := stream.Recv()
if err == io.EOF {
return nil
if err := ss.Start(nil); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
defer ss.Stop()
for i := 0; i < defaultRequestCount; i++ {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := ss.Client.UnaryCall(ctx, &grpc_testing.SimpleRequest{Payload: &grpc_testing.Payload{Body: testOkPayload}}); err != nil {
t.Fatalf("Unexpected error from UnaryCall: %v", err)
t.Logf("unary call passed count=%v", defaultRequestCount)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
stream, err := ss.Client.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("ss.Client.FullDuplexCall failed: %f", err)
if _, err = stream.Recv(); err != io.EOF {
t.Fatalf("unexpected error: %v, expected an EOF error", err)
var errs []error
for ctx.Err() == nil {
errs = nil
if value := fe.SeenViews[""]; value != TypeOpenCensusViewCount {
errs = append(errs, fmt.Errorf("unexpected type for %s != %s", value, TypeOpenCensusViewCount))
if value := fe.SeenViews[""]; value != TypeOpenCensusViewCount {
errs = append(errs, fmt.Errorf("unexpected type for %s != %s", value, TypeOpenCensusViewCount))
if value := fe.SeenViews[""]; value != TypeOpenCensusViewCount {
errs = append(errs, fmt.Errorf("unexpected type for %s != %s", value, TypeOpenCensusViewCount))
if value := fe.SeenViews[""]; value != TypeOpenCensusViewCount {
errs = append(errs, fmt.Errorf("unexpected type for %s != %s", value, TypeOpenCensusViewCount))
if fe.SeenSpans <= 0 {
errs = append(errs, fmt.Errorf("unexpected number of seen spans: %v <= 0", fe.SeenSpans))
if len(errs) == 0 {
time.Sleep(100 * time.Millisecond)
if len(errs) != 0 {
t.Fatalf("Invalid OpenCensus export data: %v", errs)
// TestCustomTagsTracingMetrics verifies that the custom tags defined in our
// observability configuration and set to two hardcoded values are passed to the
// function to create an exporter.
func (s) TestCustomTagsTracingMetrics(t *testing.T) {
defer func(ne func(config *config) (tracingMetricsExporter, error)) {
newExporter = ne
fe := &fakeOpenCensusExporter{SeenViews: make(map[string]string), t: t}
newExporter = func(config *config) (tracingMetricsExporter, error) {
ct := config.Labels
if len(ct) < 1 {
t.Fatalf("less than 2 custom tags sent in")
if val, ok := ct["customtag1"]; !ok || val != "wow" {
t.Fatalf("incorrect custom tag: got %v, want %v", val, "wow")
if val, ok := ct["customtag2"]; !ok || val != "nice" {
t.Fatalf("incorrect custom tag: got %v, want %v", val, "nice")
return fe, nil
// This configuration present in file system and it's defined custom tags should make it
// to the created exporter.
configJSON := `{
"project_id": "fake",
"cloud_trace": {},
"cloud_monitoring": {"sampling_rate": 1.0},
cleanup, err := createTmpConfigInFileSystem(configJSON)
defer cleanup()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
err = Start(ctx)
defer End()
if err != nil {
t.Fatalf("Start() failed with err: %v", err)
// TestStartErrorsThenEnd tests that an End call after Start errors works
// without problems, as this is a possible codepath in the public observability
// API.
func (s) TestStartErrorsThenEnd(t *testing.T) {
invalidConfig := &config{
ProjectID: "fake",
CloudLogging: &cloudLogging{
ClientRPCEvents: []clientRPCEvents{
Methods: []string{":-)"},
MaxMetadataBytes: 30,
MaxMessageBytes: 30,
invalidConfigJSON, err := json.Marshal(invalidConfig)
if err != nil {
t.Fatalf("failed to convert config to JSON: %v", err)
oldObservabilityConfig := envconfig.ObservabilityConfig
oldObservabilityConfigFile := envconfig.ObservabilityConfigFile
envconfig.ObservabilityConfig = string(invalidConfigJSON)
envconfig.ObservabilityConfigFile = ""
defer func() {
envconfig.ObservabilityConfig = oldObservabilityConfig
envconfig.ObservabilityConfigFile = oldObservabilityConfigFile
if err := Start(context.Background()); err == nil {
t.Fatalf("Invalid patterns not triggering error")