| /* |
| * |
| * Copyright 2022 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package observability |
| |
| import ( |
| "context" |
| "encoding/json" |
| "fmt" |
| "io" |
| "os" |
| "strings" |
| "sync" |
| "testing" |
| "time" |
| |
| "github.com/google/go-cmp/cmp" |
| "github.com/google/go-cmp/cmp/cmpopts" |
| "go.opencensus.io/stats/view" |
| "go.opencensus.io/trace" |
| "google.golang.org/grpc/internal/envconfig" |
| "google.golang.org/grpc/internal/grpcsync" |
| "google.golang.org/grpc/internal/grpctest" |
| "google.golang.org/grpc/internal/leakcheck" |
| "google.golang.org/grpc/internal/stubserver" |
| "google.golang.org/grpc/internal/testutils" |
| |
| testgrpc "google.golang.org/grpc/interop/grpc_testing" |
| testpb "google.golang.org/grpc/interop/grpc_testing" |
| ) |
| |
| type s struct { |
| grpctest.Tester |
| } |
| |
| func Test(t *testing.T) { |
| grpctest.RunSubTests(t, s{}) |
| } |
| |
| func init() { |
| // OpenCensus, once included in binary, will spawn a global goroutine |
| // recorder that is not controllable by application. |
| // https://github.com/census-instrumentation/opencensus-go/issues/1191 |
| leakcheck.RegisterIgnoreGoroutine("go.opencensus.io/stats/view.(*worker).start") |
| // google-cloud-go leaks HTTP client. They are aware of this: |
| // https://github.com/googleapis/google-cloud-go/issues/1183 |
| leakcheck.RegisterIgnoreGoroutine("internal/poll.runtime_pollWait") |
| } |
| |
| var ( |
| defaultTestTimeout = 10 * time.Second |
| testOkPayload = []byte{72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100} |
| defaultRequestCount = 24 |
| ) |
| |
| const ( |
| TypeOpenCensusViewDistribution = "distribution" |
| TypeOpenCensusViewCount = "count" |
| TypeOpenCensusViewSum = "sum" |
| TypeOpenCensusViewLastValue = "last_value" |
| ) |
| |
| type fakeOpenCensusExporter struct { |
| // The map of the observed View name and type |
| SeenViews map[string]string |
| // Number of spans |
| SeenSpans int |
| |
| idCh *testutils.Channel |
| |
| t *testing.T |
| mu sync.RWMutex |
| } |
| |
| func (fe *fakeOpenCensusExporter) ExportView(vd *view.Data) { |
| fe.mu.Lock() |
| defer fe.mu.Unlock() |
| for _, row := range vd.Rows { |
| fe.t.Logf("Metrics[%s]", vd.View.Name) |
| switch row.Data.(type) { |
| case *view.DistributionData: |
| fe.SeenViews[vd.View.Name] = TypeOpenCensusViewDistribution |
| case *view.CountData: |
| fe.SeenViews[vd.View.Name] = TypeOpenCensusViewCount |
| case *view.SumData: |
| fe.SeenViews[vd.View.Name] = TypeOpenCensusViewSum |
| case *view.LastValueData: |
| fe.SeenViews[vd.View.Name] = TypeOpenCensusViewLastValue |
| } |
| } |
| } |
| |
| type traceAndSpanID struct { |
| spanName string |
| traceID trace.TraceID |
| spanID trace.SpanID |
| isSampled bool |
| spanKind int |
| } |
| |
| type traceAndSpanIDString struct { |
| traceID string |
| spanID string |
| isSampled bool |
| // SpanKind is the type of span. |
| SpanKind int |
| } |
| |
| // idsToString is a helper that converts from generated trace and span IDs to |
| // the string version stored in trace message events. |
| func (tasi *traceAndSpanID) idsToString(projectID string) traceAndSpanIDString { |
| return traceAndSpanIDString{ |
| traceID: "projects/" + projectID + "/traces/" + tasi.traceID.String(), |
| spanID: tasi.spanID.String(), |
| isSampled: tasi.isSampled, |
| SpanKind: tasi.spanKind, |
| } |
| } |
| |
| func (fe *fakeOpenCensusExporter) ExportSpan(vd *trace.SpanData) { |
| if fe.idCh != nil { |
| // This is what export span sees representing the trace/span ID which |
| // will populate different contexts throughout the system, convert in |
| // caller to string version as the logging code does. |
| fe.idCh.Send(traceAndSpanID{ |
| spanName: vd.Name, |
| traceID: vd.TraceID, |
| spanID: vd.SpanID, |
| isSampled: vd.IsSampled(), |
| spanKind: vd.SpanKind, |
| }) |
| } |
| |
| fe.mu.Lock() |
| defer fe.mu.Unlock() |
| fe.SeenSpans++ |
| fe.t.Logf("Span[%v]", vd.Name) |
| } |
| |
| func (fe *fakeOpenCensusExporter) Flush() {} |
| |
| func (fe *fakeOpenCensusExporter) Close() error { |
| return nil |
| } |
| |
| func (s) TestRefuseStartWithInvalidPatterns(t *testing.T) { |
| invalidConfig := &config{ |
| ProjectID: "fake", |
| CloudLogging: &cloudLogging{ |
| ClientRPCEvents: []clientRPCEvents{ |
| { |
| Methods: []string{":-)"}, |
| MaxMetadataBytes: 30, |
| MaxMessageBytes: 30, |
| }, |
| }, |
| }, |
| } |
| invalidConfigJSON, err := json.Marshal(invalidConfig) |
| if err != nil { |
| t.Fatalf("failed to convert config to JSON: %v", err) |
| } |
| oldObservabilityConfig := envconfig.ObservabilityConfig |
| oldObservabilityConfigFile := envconfig.ObservabilityConfigFile |
| envconfig.ObservabilityConfig = string(invalidConfigJSON) |
| envconfig.ObservabilityConfigFile = "" |
| defer func() { |
| envconfig.ObservabilityConfig = oldObservabilityConfig |
| envconfig.ObservabilityConfigFile = oldObservabilityConfigFile |
| }() |
| // If there is at least one invalid pattern, which should not be silently tolerated. |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if err := Start(ctx); err == nil { |
| t.Fatalf("Invalid patterns not triggering error") |
| } |
| } |
| |
| // TestRefuseStartWithExcludeAndWildCardAll tests the scenario where an |
| // observability configuration is provided with client RPC event specifying to |
| // exclude, and which matches on the '*' wildcard (any). This should cause an |
| // error when trying to start the observability system. |
| func (s) TestRefuseStartWithExcludeAndWildCardAll(t *testing.T) { |
| invalidConfig := &config{ |
| ProjectID: "fake", |
| CloudLogging: &cloudLogging{ |
| ClientRPCEvents: []clientRPCEvents{ |
| { |
| Methods: []string{"*"}, |
| Exclude: true, |
| MaxMetadataBytes: 30, |
| MaxMessageBytes: 30, |
| }, |
| }, |
| }, |
| } |
| invalidConfigJSON, err := json.Marshal(invalidConfig) |
| if err != nil { |
| t.Fatalf("failed to convert config to JSON: %v", err) |
| } |
| oldObservabilityConfig := envconfig.ObservabilityConfig |
| oldObservabilityConfigFile := envconfig.ObservabilityConfigFile |
| envconfig.ObservabilityConfig = string(invalidConfigJSON) |
| envconfig.ObservabilityConfigFile = "" |
| defer func() { |
| envconfig.ObservabilityConfig = oldObservabilityConfig |
| envconfig.ObservabilityConfigFile = oldObservabilityConfigFile |
| }() |
| // If there is at least one invalid pattern, which should not be silently tolerated. |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if err := Start(ctx); err == nil { |
| t.Fatalf("Invalid patterns not triggering error") |
| } |
| } |
| |
| // createTmpConfigInFileSystem creates a random observability config at a random |
| // place in the temporary portion of the file system dependent on system. It |
| // also sets the environment variable GRPC_CONFIG_OBSERVABILITY_JSON to point to |
| // this created config. |
| func createTmpConfigInFileSystem(rawJSON string) (func(), error) { |
| configJSONFile, err := os.CreateTemp(os.TempDir(), "configJSON-") |
| if err != nil { |
| return nil, fmt.Errorf("cannot create file %v: %v", configJSONFile.Name(), err) |
| } |
| _, err = configJSONFile.Write(json.RawMessage(rawJSON)) |
| if err != nil { |
| return nil, fmt.Errorf("cannot write marshalled JSON: %v", err) |
| } |
| oldObservabilityConfigFile := envconfig.ObservabilityConfigFile |
| envconfig.ObservabilityConfigFile = configJSONFile.Name() |
| return func() { |
| configJSONFile.Close() |
| envconfig.ObservabilityConfigFile = oldObservabilityConfigFile |
| }, nil |
| } |
| |
| // TestJSONEnvVarSet tests a valid observability configuration specified by the |
| // GRPC_CONFIG_OBSERVABILITY_JSON environment variable, whose value represents a |
| // file path pointing to a JSON encoded config. |
| func (s) TestJSONEnvVarSet(t *testing.T) { |
| configJSON := `{ |
| "project_id": "fake" |
| }` |
| cleanup, err := createTmpConfigInFileSystem(configJSON) |
| defer cleanup() |
| |
| if err != nil { |
| t.Fatalf("failed to create config in file system: %v", err) |
| } |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if err := Start(ctx); err != nil { |
| t.Fatalf("error starting observability with valid config through file system: %v", err) |
| } |
| defer End() |
| } |
| |
| // TestBothConfigEnvVarsSet tests the scenario where both configuration |
| // environment variables are set. The file system environment variable should |
| // take precedence, and an error should return in the case of the file system |
| // configuration being invalid, even if the direct configuration environment |
| // variable is set and valid. |
| func (s) TestBothConfigEnvVarsSet(t *testing.T) { |
| invalidConfig := &config{ |
| ProjectID: "fake", |
| CloudLogging: &cloudLogging{ |
| ClientRPCEvents: []clientRPCEvents{ |
| { |
| Methods: []string{":-)"}, |
| MaxMetadataBytes: 30, |
| MaxMessageBytes: 30, |
| }, |
| }, |
| }, |
| } |
| invalidConfigJSON, err := json.Marshal(invalidConfig) |
| if err != nil { |
| t.Fatalf("failed to convert config to JSON: %v", err) |
| } |
| cleanup, err := createTmpConfigInFileSystem(string(invalidConfigJSON)) |
| defer cleanup() |
| if err != nil { |
| t.Fatalf("failed to create config in file system: %v", err) |
| } |
| // This configuration should be ignored, as precedence 2. |
| validConfig := &config{ |
| ProjectID: "fake", |
| CloudLogging: &cloudLogging{ |
| ClientRPCEvents: []clientRPCEvents{ |
| { |
| Methods: []string{"*"}, |
| MaxMetadataBytes: 30, |
| MaxMessageBytes: 30, |
| }, |
| }, |
| }, |
| } |
| validConfigJSON, err := json.Marshal(validConfig) |
| if err != nil { |
| t.Fatalf("failed to convert config to JSON: %v", err) |
| } |
| oldObservabilityConfig := envconfig.ObservabilityConfig |
| envconfig.ObservabilityConfig = string(validConfigJSON) |
| defer func() { |
| envconfig.ObservabilityConfig = oldObservabilityConfig |
| }() |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if err := Start(ctx); err == nil { |
| t.Fatalf("Invalid patterns not triggering error") |
| } |
| } |
| |
| // TestErrInFileSystemEnvVar tests the scenario where an observability |
| // configuration is specified with environment variable that specifies a |
| // location in the file system for configuration, and this location doesn't have |
| // a file (or valid configuration). |
| func (s) TestErrInFileSystemEnvVar(t *testing.T) { |
| oldObservabilityConfigFile := envconfig.ObservabilityConfigFile |
| envconfig.ObservabilityConfigFile = "/this-file/does-not-exist" |
| defer func() { |
| envconfig.ObservabilityConfigFile = oldObservabilityConfigFile |
| }() |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if err := Start(ctx); err == nil { |
| t.Fatalf("Invalid file system path not triggering error") |
| } |
| } |
| |
| func (s) TestNoEnvSet(t *testing.T) { |
| oldObservabilityConfig := envconfig.ObservabilityConfig |
| oldObservabilityConfigFile := envconfig.ObservabilityConfigFile |
| envconfig.ObservabilityConfig = "" |
| envconfig.ObservabilityConfigFile = "" |
| defer func() { |
| envconfig.ObservabilityConfig = oldObservabilityConfig |
| envconfig.ObservabilityConfigFile = oldObservabilityConfigFile |
| }() |
| // If there is no observability config set at all, the Start should return an error. |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if err := Start(ctx); err == nil { |
| t.Fatalf("Invalid patterns not triggering error") |
| } |
| } |
| |
| func (s) TestOpenCensusIntegration(t *testing.T) { |
| defaultMetricsReportingInterval = time.Millisecond * 100 |
| fe := &fakeOpenCensusExporter{SeenViews: make(map[string]string), t: t} |
| |
| defer func(ne func(config *config) (tracingMetricsExporter, error)) { |
| newExporter = ne |
| }(newExporter) |
| |
| newExporter = func(*config) (tracingMetricsExporter, error) { |
| return fe, nil |
| } |
| |
| openCensusOnConfig := &config{ |
| ProjectID: "fake", |
| CloudMonitoring: &cloudMonitoring{}, |
| CloudTrace: &cloudTrace{ |
| SamplingRate: 1.0, |
| }, |
| } |
| cleanup, err := setupObservabilitySystemWithConfig(openCensusOnConfig) |
| if err != nil { |
| t.Fatalf("error setting up observability %v", err) |
| } |
| defer cleanup() |
| |
| ss := &stubserver.StubServer{ |
| UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { |
| return &testpb.SimpleResponse{}, nil |
| }, |
| FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { |
| for { |
| _, err := stream.Recv() |
| if err == io.EOF { |
| return nil |
| } |
| } |
| }, |
| } |
| if err := ss.Start(nil); err != nil { |
| t.Fatalf("Error starting endpoint server: %v", err) |
| } |
| defer ss.Stop() |
| |
| for i := 0; i < defaultRequestCount; i++ { |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{Body: testOkPayload}}); err != nil { |
| t.Fatalf("Unexpected error from UnaryCall: %v", err) |
| } |
| } |
| t.Logf("unary call passed count=%v", defaultRequestCount) |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| stream, err := ss.Client.FullDuplexCall(ctx) |
| if err != nil { |
| t.Fatalf("ss.Client.FullDuplexCall failed: %f", err) |
| } |
| |
| stream.CloseSend() |
| if _, err = stream.Recv(); err != io.EOF { |
| t.Fatalf("unexpected error: %v, expected an EOF error", err) |
| } |
| |
| var errs []error |
| for ctx.Err() == nil { |
| errs = nil |
| fe.mu.RLock() |
| if value := fe.SeenViews["grpc.io/client/api_latency"]; value != TypeOpenCensusViewDistribution { |
| errs = append(errs, fmt.Errorf("unexpected type for grpc.io/client/api_latency: %s != %s", value, TypeOpenCensusViewDistribution)) |
| } |
| if value := fe.SeenViews["grpc.io/client/started_rpcs"]; value != TypeOpenCensusViewCount { |
| errs = append(errs, fmt.Errorf("unexpected type for grpc.io/client/started_rpcs: %s != %s", value, TypeOpenCensusViewCount)) |
| } |
| if value := fe.SeenViews["grpc.io/server/started_rpcs"]; value != TypeOpenCensusViewCount { |
| errs = append(errs, fmt.Errorf("unexpected type for grpc.io/server/started_rpcs: %s != %s", value, TypeOpenCensusViewCount)) |
| } |
| |
| if value := fe.SeenViews["grpc.io/client/completed_rpcs"]; value != TypeOpenCensusViewCount { |
| errs = append(errs, fmt.Errorf("unexpected type for grpc.io/client/completed_rpcs: %s != %s", value, TypeOpenCensusViewCount)) |
| } |
| if value := fe.SeenViews["grpc.io/server/completed_rpcs"]; value != TypeOpenCensusViewCount { |
| errs = append(errs, fmt.Errorf("unexpected type for grpc.io/server/completed_rpcs: %s != %s", value, TypeOpenCensusViewCount)) |
| } |
| if value := fe.SeenViews["grpc.io/client/roundtrip_latency"]; value != TypeOpenCensusViewDistribution { |
| errs = append(errs, fmt.Errorf("unexpected type for grpc.io/client/completed_rpcs: %s != %s", value, TypeOpenCensusViewDistribution)) |
| } |
| if value := fe.SeenViews["grpc.io/server/server_latency"]; value != TypeOpenCensusViewDistribution { |
| errs = append(errs, fmt.Errorf("grpc.io/server/server_latency: %s != %s", value, TypeOpenCensusViewDistribution)) |
| } |
| if value := fe.SeenViews["grpc.io/client/sent_compressed_message_bytes_per_rpc"]; value != TypeOpenCensusViewDistribution { |
| errs = append(errs, fmt.Errorf("unexpected type for grpc.io/client/sent_compressed_message_bytes_per_rpc: %s != %s", value, TypeOpenCensusViewDistribution)) |
| } |
| if value := fe.SeenViews["grpc.io/client/received_compressed_message_bytes_per_rpc"]; value != TypeOpenCensusViewDistribution { |
| errs = append(errs, fmt.Errorf("unexpected type for grpc.io/client/received_compressed_message_bytes_per_rpc: %s != %s", value, TypeOpenCensusViewDistribution)) |
| } |
| if value := fe.SeenViews["grpc.io/server/sent_compressed_message_bytes_per_rpc"]; value != TypeOpenCensusViewDistribution { |
| errs = append(errs, fmt.Errorf("unexpected type for grpc.io/server/sent_compressed_message_bytes_per_rpc: %s != %s", value, TypeOpenCensusViewDistribution)) |
| } |
| if value := fe.SeenViews["grpc.io/server/received_compressed_message_bytes_per_rpc"]; value != TypeOpenCensusViewDistribution { |
| errs = append(errs, fmt.Errorf("unexpected type for grpc.io/server/received_compressed_message_bytes_per_rpc: %s != %s", value, TypeOpenCensusViewDistribution)) |
| } |
| if fe.SeenSpans <= 0 { |
| errs = append(errs, fmt.Errorf("unexpected number of seen spans: %v <= 0", fe.SeenSpans)) |
| } |
| fe.mu.RUnlock() |
| if len(errs) == 0 { |
| break |
| } |
| time.Sleep(100 * time.Millisecond) |
| } |
| if len(errs) != 0 { |
| t.Fatalf("Invalid OpenCensus export data: %v", errs) |
| } |
| } |
| |
| // TestCustomTagsTracingMetrics verifies that the custom tags defined in our |
| // observability configuration and set to two hardcoded values are passed to the |
| // function to create an exporter. |
| func (s) TestCustomTagsTracingMetrics(t *testing.T) { |
| defer func(ne func(config *config) (tracingMetricsExporter, error)) { |
| newExporter = ne |
| }(newExporter) |
| fe := &fakeOpenCensusExporter{SeenViews: make(map[string]string), t: t} |
| newExporter = func(config *config) (tracingMetricsExporter, error) { |
| ct := config.Labels |
| if len(ct) < 1 { |
| t.Fatalf("less than 2 custom tags sent in") |
| } |
| if val, ok := ct["customtag1"]; !ok || val != "wow" { |
| t.Fatalf("incorrect custom tag: got %v, want %v", val, "wow") |
| } |
| if val, ok := ct["customtag2"]; !ok || val != "nice" { |
| t.Fatalf("incorrect custom tag: got %v, want %v", val, "nice") |
| } |
| return fe, nil |
| } |
| |
| // This configuration present in file system and it's defined custom tags should make it |
| // to the created exporter. |
| configJSON := `{ |
| "project_id": "fake", |
| "cloud_trace": {}, |
| "cloud_monitoring": {"sampling_rate": 1.0}, |
| "labels":{"customtag1":"wow","customtag2":"nice"} |
| }` |
| |
| cleanup, err := createTmpConfigInFileSystem(configJSON) |
| if err != nil { |
| t.Fatalf("failed to create config in file system: %v", err) |
| } |
| defer cleanup() |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| err = Start(ctx) |
| defer End() |
| if err != nil { |
| t.Fatalf("Start() failed with err: %v", err) |
| } |
| } |
| |
| // TestStartErrorsThenEnd tests that an End call after Start errors works |
| // without problems, as this is a possible codepath in the public observability |
| // API. |
| func (s) TestStartErrorsThenEnd(t *testing.T) { |
| invalidConfig := &config{ |
| ProjectID: "fake", |
| CloudLogging: &cloudLogging{ |
| ClientRPCEvents: []clientRPCEvents{ |
| { |
| Methods: []string{":-)"}, |
| MaxMetadataBytes: 30, |
| MaxMessageBytes: 30, |
| }, |
| }, |
| }, |
| } |
| invalidConfigJSON, err := json.Marshal(invalidConfig) |
| if err != nil { |
| t.Fatalf("failed to convert config to JSON: %v", err) |
| } |
| oldObservabilityConfig := envconfig.ObservabilityConfig |
| oldObservabilityConfigFile := envconfig.ObservabilityConfigFile |
| envconfig.ObservabilityConfig = string(invalidConfigJSON) |
| envconfig.ObservabilityConfigFile = "" |
| defer func() { |
| envconfig.ObservabilityConfig = oldObservabilityConfig |
| envconfig.ObservabilityConfigFile = oldObservabilityConfigFile |
| }() |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if err := Start(ctx); err == nil { |
| t.Fatalf("Invalid patterns not triggering error") |
| } |
| End() |
| } |
| |
| // TestLoggingLinkedWithTraceClientSide tests that client side logs get the |
| // trace and span id corresponding to the created Call Level Span for the RPC. |
| func (s) TestLoggingLinkedWithTraceClientSide(t *testing.T) { |
| fle := &fakeLoggingExporter{ |
| t: t, |
| } |
| oldNewLoggingExporter := newLoggingExporter |
| defer func() { |
| newLoggingExporter = oldNewLoggingExporter |
| }() |
| |
| newLoggingExporter = func(context.Context, *config) (loggingExporter, error) { |
| return fle, nil |
| } |
| |
| idCh := testutils.NewChannel() |
| |
| fe := &fakeOpenCensusExporter{ |
| t: t, |
| idCh: idCh, |
| } |
| oldNewExporter := newExporter |
| defer func() { |
| newExporter = oldNewExporter |
| }() |
| |
| newExporter = func(*config) (tracingMetricsExporter, error) { |
| return fe, nil |
| } |
| |
| const projectID = "project-id" |
| tracesAndLogsConfig := &config{ |
| ProjectID: projectID, |
| CloudLogging: &cloudLogging{ |
| ClientRPCEvents: []clientRPCEvents{ |
| { |
| Methods: []string{"*"}, |
| MaxMetadataBytes: 30, |
| MaxMessageBytes: 30, |
| }, |
| }, |
| }, |
| CloudTrace: &cloudTrace{ |
| SamplingRate: 1.0, |
| }, |
| } |
| cleanup, err := setupObservabilitySystemWithConfig(tracesAndLogsConfig) |
| if err != nil { |
| t.Fatalf("error setting up observability %v", err) |
| } |
| defer cleanup() |
| ss := &stubserver.StubServer{ |
| UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { |
| return &testpb.SimpleResponse{}, nil |
| }, |
| FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { |
| _, err := stream.Recv() |
| if err != io.EOF { |
| return err |
| } |
| return nil |
| }, |
| } |
| if err := ss.Start(nil); err != nil { |
| t.Fatalf("Error starting endpoint server: %v", err) |
| } |
| defer ss.Stop() |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| |
| // Spawn a goroutine to receive the trace and span ids received by the |
| // exporter corresponding to a Unary RPC. |
| readerErrCh := testutils.NewChannel() |
| unaryDone := grpcsync.NewEvent() |
| go func() { |
| var traceAndSpanIDs []traceAndSpanID |
| val, err := idCh.Receive(ctx) |
| if err != nil { |
| readerErrCh.Send(fmt.Errorf("error while waiting for IDs: %v", err)) |
| } |
| |
| tasi, ok := val.(traceAndSpanID) |
| if !ok { |
| readerErrCh.Send(fmt.Errorf("received wrong type from channel: %T", val)) |
| } |
| traceAndSpanIDs = append(traceAndSpanIDs, tasi) |
| |
| val, err = idCh.Receive(ctx) |
| if err != nil { |
| readerErrCh.Send(fmt.Errorf("error while waiting for IDs: %v", err)) |
| } |
| |
| tasi, ok = val.(traceAndSpanID) |
| if !ok { |
| readerErrCh.Send(fmt.Errorf("received wrong type from channel: %T", val)) |
| } |
| traceAndSpanIDs = append(traceAndSpanIDs, tasi) |
| |
| val, err = idCh.Receive(ctx) |
| if err != nil { |
| readerErrCh.Send(fmt.Errorf("error while waiting for IDs: %v", err)) |
| } |
| tasi, ok = val.(traceAndSpanID) |
| if !ok { |
| readerErrCh.Send(fmt.Errorf("received wrong type from channel: %T", val)) |
| } |
| traceAndSpanIDs = append(traceAndSpanIDs, tasi) |
| <-unaryDone.Done() |
| var tasiSent traceAndSpanIDString |
| for _, tasi := range traceAndSpanIDs { |
| if strings.HasPrefix(tasi.spanName, "grpc.") && tasi.spanKind == trace.SpanKindClient { |
| tasiSent = tasi.idsToString(projectID) |
| continue |
| } |
| } |
| |
| fle.mu.Lock() |
| for _, tasiSeen := range fle.idsSeen { |
| if diff := cmp.Diff(tasiSeen, &tasiSent, cmp.AllowUnexported(traceAndSpanIDString{}), cmpopts.IgnoreFields(traceAndSpanIDString{}, "SpanKind")); diff != "" { |
| readerErrCh.Send(fmt.Errorf("got unexpected id, should be a client span (-got, +want): %v", diff)) |
| } |
| } |
| |
| fle.entries = nil |
| fle.mu.Unlock() |
| readerErrCh.Send(nil) |
| }() |
| if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{Body: testOkPayload}}); err != nil { |
| t.Fatalf("Unexpected error from UnaryCall: %v", err) |
| } |
| unaryDone.Fire() |
| if chErr, err := readerErrCh.Receive(ctx); chErr != nil || err != nil { |
| if err != nil { |
| t.Fatalf("Should have received something from error channel: %v", err) |
| } |
| if chErr != nil { |
| t.Fatalf("Should have received a nil error from channel, instead received: %v", chErr) |
| } |
| } |
| } |
| |
| // TestLoggingLinkedWithTraceServerSide tests that server side logs get the |
| // trace and span id corresponding to the created Server Span for the RPC. |
| func (s) TestLoggingLinkedWithTraceServerSide(t *testing.T) { |
| fle := &fakeLoggingExporter{ |
| t: t, |
| } |
| oldNewLoggingExporter := newLoggingExporter |
| defer func() { |
| newLoggingExporter = oldNewLoggingExporter |
| }() |
| |
| newLoggingExporter = func(context.Context, *config) (loggingExporter, error) { |
| return fle, nil |
| } |
| |
| idCh := testutils.NewChannel() |
| |
| fe := &fakeOpenCensusExporter{ |
| t: t, |
| idCh: idCh, |
| } |
| oldNewExporter := newExporter |
| defer func() { |
| newExporter = oldNewExporter |
| }() |
| |
| newExporter = func(*config) (tracingMetricsExporter, error) { |
| return fe, nil |
| } |
| |
| const projectID = "project-id" |
| tracesAndLogsConfig := &config{ |
| ProjectID: projectID, |
| CloudLogging: &cloudLogging{ |
| ServerRPCEvents: []serverRPCEvents{ |
| { |
| Methods: []string{"*"}, |
| MaxMetadataBytes: 30, |
| MaxMessageBytes: 30, |
| }, |
| }, |
| }, |
| CloudTrace: &cloudTrace{ |
| SamplingRate: 1.0, |
| }, |
| } |
| cleanup, err := setupObservabilitySystemWithConfig(tracesAndLogsConfig) |
| if err != nil { |
| t.Fatalf("error setting up observability %v", err) |
| } |
| defer cleanup() |
| ss := &stubserver.StubServer{ |
| UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { |
| return &testpb.SimpleResponse{}, nil |
| }, |
| FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { |
| _, err := stream.Recv() |
| if err != io.EOF { |
| return err |
| } |
| return nil |
| }, |
| } |
| if err := ss.Start(nil); err != nil { |
| t.Fatalf("Error starting endpoint server: %v", err) |
| } |
| defer ss.Stop() |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| |
| // Spawn a goroutine to receive the trace and span ids received by the |
| // exporter corresponding to a Unary RPC. |
| readerErrCh := testutils.NewChannel() |
| unaryDone := grpcsync.NewEvent() |
| go func() { |
| var traceAndSpanIDs []traceAndSpanID |
| val, err := idCh.Receive(ctx) |
| if err != nil { |
| readerErrCh.Send(fmt.Errorf("error while waiting for IDs: %v", err)) |
| } |
| |
| tasi, ok := val.(traceAndSpanID) |
| if !ok { |
| readerErrCh.Send(fmt.Errorf("received wrong type from channel: %T", val)) |
| } |
| traceAndSpanIDs = append(traceAndSpanIDs, tasi) |
| |
| val, err = idCh.Receive(ctx) |
| if err != nil { |
| readerErrCh.Send(fmt.Errorf("error while waiting for IDs: %v", err)) |
| } |
| |
| tasi, ok = val.(traceAndSpanID) |
| if !ok { |
| readerErrCh.Send(fmt.Errorf("received wrong type from channel: %T", val)) |
| } |
| traceAndSpanIDs = append(traceAndSpanIDs, tasi) |
| |
| val, err = idCh.Receive(ctx) |
| if err != nil { |
| readerErrCh.Send(fmt.Errorf("error while waiting for IDs: %v", err)) |
| } |
| tasi, ok = val.(traceAndSpanID) |
| if !ok { |
| readerErrCh.Send(fmt.Errorf("received wrong type from channel: %T", val)) |
| } |
| traceAndSpanIDs = append(traceAndSpanIDs, tasi) |
| <-unaryDone.Done() |
| var tasiServer traceAndSpanIDString |
| for _, tasi := range traceAndSpanIDs { |
| if strings.HasPrefix(tasi.spanName, "grpc.") && tasi.spanKind == trace.SpanKindServer { |
| tasiServer = tasi.idsToString(projectID) |
| continue |
| } |
| } |
| |
| fle.mu.Lock() |
| for _, tasiSeen := range fle.idsSeen { |
| if diff := cmp.Diff(tasiSeen, &tasiServer, cmp.AllowUnexported(traceAndSpanIDString{}), cmpopts.IgnoreFields(traceAndSpanIDString{}, "SpanKind")); diff != "" { |
| readerErrCh.Send(fmt.Errorf("got unexpected id, should be a server span (-got, +want): %v", diff)) |
| } |
| } |
| |
| fle.entries = nil |
| fle.mu.Unlock() |
| readerErrCh.Send(nil) |
| }() |
| if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{Body: testOkPayload}}); err != nil { |
| t.Fatalf("Unexpected error from UnaryCall: %v", err) |
| } |
| unaryDone.Fire() |
| if chErr, err := readerErrCh.Receive(ctx); chErr != nil || err != nil { |
| if err != nil { |
| t.Fatalf("Should have received something from error channel: %v", err) |
| } |
| if chErr != nil { |
| t.Fatalf("Should have received a nil error from channel, instead received: %v", chErr) |
| } |
| } |
| } |
| |
| // TestLoggingLinkedWithTrace tests that client and server side logs get the |
| // trace and span id corresponding to either the Call Level Span or Server Span |
| // (no determinism, so can only assert one or the other), for Unary and |
| // Streaming RPCs. |
| func (s) TestLoggingLinkedWithTrace(t *testing.T) { |
| fle := &fakeLoggingExporter{ |
| t: t, |
| } |
| oldNewLoggingExporter := newLoggingExporter |
| defer func() { |
| newLoggingExporter = oldNewLoggingExporter |
| }() |
| |
| newLoggingExporter = func(context.Context, *config) (loggingExporter, error) { |
| return fle, nil |
| } |
| |
| idCh := testutils.NewChannel() |
| |
| fe := &fakeOpenCensusExporter{ |
| t: t, |
| idCh: idCh, |
| } |
| oldNewExporter := newExporter |
| defer func() { |
| newExporter = oldNewExporter |
| }() |
| |
| newExporter = func(*config) (tracingMetricsExporter, error) { |
| return fe, nil |
| } |
| |
| const projectID = "project-id" |
| tracesAndLogsConfig := &config{ |
| ProjectID: projectID, |
| CloudLogging: &cloudLogging{ |
| ClientRPCEvents: []clientRPCEvents{ |
| { |
| Methods: []string{"*"}, |
| MaxMetadataBytes: 30, |
| MaxMessageBytes: 30, |
| }, |
| }, |
| ServerRPCEvents: []serverRPCEvents{ |
| { |
| Methods: []string{"*"}, |
| MaxMetadataBytes: 30, |
| MaxMessageBytes: 30, |
| }, |
| }, |
| }, |
| CloudTrace: &cloudTrace{ |
| SamplingRate: 1.0, |
| }, |
| } |
| cleanup, err := setupObservabilitySystemWithConfig(tracesAndLogsConfig) |
| if err != nil { |
| t.Fatalf("error setting up observability %v", err) |
| } |
| defer cleanup() |
| ss := &stubserver.StubServer{ |
| UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { |
| return &testpb.SimpleResponse{}, nil |
| }, |
| FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { |
| _, err := stream.Recv() |
| if err != io.EOF { |
| return err |
| } |
| return nil |
| }, |
| } |
| if err := ss.Start(nil); err != nil { |
| t.Fatalf("Error starting endpoint server: %v", err) |
| } |
| defer ss.Stop() |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| |
| // Spawn a goroutine to receive the trace and span ids received by the |
| // exporter corresponding to a Unary RPC. |
| readerErrCh := testutils.NewChannel() |
| unaryDone := grpcsync.NewEvent() |
| go func() { |
| var traceAndSpanIDs []traceAndSpanID |
| val, err := idCh.Receive(ctx) |
| if err != nil { |
| readerErrCh.Send(fmt.Errorf("error while waiting for IDs: %v", err)) |
| } |
| |
| tasi, ok := val.(traceAndSpanID) |
| if !ok { |
| readerErrCh.Send(fmt.Errorf("received wrong type from channel: %T", val)) |
| } |
| traceAndSpanIDs = append(traceAndSpanIDs, tasi) |
| |
| val, err = idCh.Receive(ctx) |
| if err != nil { |
| readerErrCh.Send(fmt.Errorf("error while waiting for IDs: %v", err)) |
| } |
| |
| tasi, ok = val.(traceAndSpanID) |
| if !ok { |
| readerErrCh.Send(fmt.Errorf("received wrong type from channel: %T", val)) |
| } |
| traceAndSpanIDs = append(traceAndSpanIDs, tasi) |
| |
| val, err = idCh.Receive(ctx) |
| if err != nil { |
| readerErrCh.Send(fmt.Errorf("error while waiting for IDs: %v", err)) |
| } |
| tasi, ok = val.(traceAndSpanID) |
| if !ok { |
| readerErrCh.Send(fmt.Errorf("received wrong type from channel: %T", val)) |
| } |
| traceAndSpanIDs = append(traceAndSpanIDs, tasi) |
| <-unaryDone.Done() |
| var tasiSent traceAndSpanIDString |
| var tasiServer traceAndSpanIDString |
| for _, tasi := range traceAndSpanIDs { |
| if strings.HasPrefix(tasi.spanName, "grpc.") && tasi.spanKind == trace.SpanKindClient { |
| tasiSent = tasi.idsToString(projectID) |
| continue |
| } |
| if strings.HasPrefix(tasi.spanName, "grpc.") && tasi.spanKind == trace.SpanKindServer { |
| tasiServer = tasi.idsToString(projectID) |
| } |
| } |
| |
| fle.mu.Lock() |
| for _, tasiSeen := range fle.idsSeen { |
| if diff := cmp.Diff(tasiSeen, &tasiSent, cmp.AllowUnexported(traceAndSpanIDString{}), cmpopts.IgnoreFields(traceAndSpanIDString{}, "SpanKind")); diff != "" { |
| if diff2 := cmp.Diff(tasiSeen, &tasiServer, cmp.AllowUnexported(traceAndSpanIDString{}), cmpopts.IgnoreFields(traceAndSpanIDString{}, "SpanKind")); diff2 != "" { |
| readerErrCh.Send(fmt.Errorf("got unexpected id, should be a client or server span (-got, +want): %v, %v", diff, diff2)) |
| } |
| } |
| } |
| |
| fle.entries = nil |
| fle.mu.Unlock() |
| readerErrCh.Send(nil) |
| }() |
| if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{Body: testOkPayload}}); err != nil { |
| t.Fatalf("Unexpected error from UnaryCall: %v", err) |
| } |
| unaryDone.Fire() |
| if chErr, err := readerErrCh.Receive(ctx); chErr != nil || err != nil { |
| if err != nil { |
| t.Fatalf("Should have received something from error channel: %v", err) |
| } |
| if chErr != nil { |
| t.Fatalf("Should have received a nil error from channel, instead received: %v", chErr) |
| } |
| } |
| |
| fle.mu.Lock() |
| fle.idsSeen = nil |
| fle.mu.Unlock() |
| |
| // Test streaming. Spawn a goroutine to receive the trace and span ids |
| // received by the exporter corresponding to a streaming RPC. |
| readerErrCh = testutils.NewChannel() |
| streamDone := grpcsync.NewEvent() |
| go func() { |
| var traceAndSpanIDs []traceAndSpanID |
| |
| val, err := idCh.Receive(ctx) |
| if err != nil { |
| readerErrCh.Send(fmt.Errorf("error while waiting for IDs: %v", err)) |
| } |
| |
| tasi, ok := val.(traceAndSpanID) |
| if !ok { |
| readerErrCh.Send(fmt.Errorf("received wrong type from channel: %T", val)) |
| } |
| traceAndSpanIDs = append(traceAndSpanIDs, tasi) |
| val, err = idCh.Receive(ctx) |
| if err != nil { |
| readerErrCh.Send(fmt.Errorf("error while waiting for IDs: %v", err)) |
| } |
| |
| tasi, ok = val.(traceAndSpanID) |
| if !ok { |
| readerErrCh.Send(fmt.Errorf("received wrong type from channel: %T", val)) |
| } |
| traceAndSpanIDs = append(traceAndSpanIDs, tasi) |
| |
| val, err = idCh.Receive(ctx) |
| if err != nil { |
| readerErrCh.Send(fmt.Errorf("error while waiting for IDs: %v", err)) |
| } |
| tasi, ok = val.(traceAndSpanID) |
| if !ok { |
| readerErrCh.Send(fmt.Errorf("received wrong type from channel: %T", val)) |
| } |
| traceAndSpanIDs = append(traceAndSpanIDs, tasi) |
| <-streamDone.Done() |
| var tasiSent traceAndSpanIDString |
| var tasiServer traceAndSpanIDString |
| for _, tasi := range traceAndSpanIDs { |
| if strings.HasPrefix(tasi.spanName, "grpc.") && tasi.spanKind == trace.SpanKindClient { |
| tasiSent = tasi.idsToString(projectID) |
| continue |
| } |
| if strings.HasPrefix(tasi.spanName, "grpc.") && tasi.spanKind == trace.SpanKindServer { |
| tasiServer = tasi.idsToString(projectID) |
| } |
| } |
| |
| fle.mu.Lock() |
| for _, tasiSeen := range fle.idsSeen { |
| if diff := cmp.Diff(tasiSeen, &tasiSent, cmp.AllowUnexported(traceAndSpanIDString{}), cmpopts.IgnoreFields(traceAndSpanIDString{}, "SpanKind")); diff != "" { |
| if diff2 := cmp.Diff(tasiSeen, &tasiServer, cmp.AllowUnexported(traceAndSpanIDString{}), cmpopts.IgnoreFields(traceAndSpanIDString{}, "SpanKind")); diff2 != "" { |
| readerErrCh.Send(fmt.Errorf("got unexpected id, should be a client or server span (-got, +want): %v, %v", diff, diff2)) |
| } |
| } |
| } |
| |
| fle.entries = nil |
| fle.mu.Unlock() |
| readerErrCh.Send(nil) |
| }() |
| |
| stream, err := ss.Client.FullDuplexCall(ctx) |
| if err != nil { |
| t.Fatalf("ss.Client.FullDuplexCall failed: %f", err) |
| } |
| |
| stream.CloseSend() |
| if _, err = stream.Recv(); err != io.EOF { |
| t.Fatalf("unexpected error: %v, expected an EOF error", err) |
| } |
| streamDone.Fire() |
| |
| if chErr, err := readerErrCh.Receive(ctx); chErr != nil || err != nil { |
| if err != nil { |
| t.Fatalf("Should have received something from error channel: %v", err) |
| } |
| if chErr != nil { |
| t.Fatalf("Should have received a nil error from channel, instead received: %v", chErr) |
| } |
| } |
| } |