| /* |
| * Copyright 2022 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package opencensus |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "io" |
| "reflect" |
| "sort" |
| "sync" |
| "testing" |
| "time" |
| |
| "github.com/google/go-cmp/cmp" |
| "go.opencensus.io/stats/view" |
| "go.opencensus.io/tag" |
| "go.opencensus.io/trace" |
| |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/encoding/gzip" |
| "google.golang.org/grpc/internal/grpctest" |
| "google.golang.org/grpc/internal/leakcheck" |
| "google.golang.org/grpc/internal/stubserver" |
| "google.golang.org/grpc/internal/testutils" |
| "google.golang.org/grpc/test/grpc_testing" |
| ) |
| |
| type s struct { |
| grpctest.Tester |
| } |
| |
| func Test(t *testing.T) { |
| grpctest.RunSubTests(t, s{}) |
| } |
| |
| func init() { |
| // OpenCensus, once included in binary, will spawn a global goroutine |
| // recorder that is not controllable by application. |
| // https://github.com/census-instrumentation/opencensus-go/issues/1191 |
| leakcheck.RegisterIgnoreGoroutine("go.opencensus.io/stats/view.(*worker).start") |
| } |
| |
| var defaultTestTimeout = 5 * time.Second |
| |
| type fakeExporter struct { |
| t *testing.T |
| |
| mu sync.RWMutex |
| seenViews map[string]*viewInformation |
| seenSpans []spanInformation |
| } |
| |
| // viewInformation is information Exported from the view package through |
| // ExportView relevant to testing, i.e. a reasonably non flaky expectation of |
| // desired emissions to Exporter. |
| type viewInformation struct { |
| aggType view.AggType |
| aggBuckets []float64 |
| desc string |
| tagKeys []tag.Key |
| rows []*view.Row |
| } |
| |
| func (fe *fakeExporter) ExportView(vd *view.Data) { |
| fe.mu.Lock() |
| defer fe.mu.Unlock() |
| fe.seenViews[vd.View.Name] = &viewInformation{ |
| aggType: vd.View.Aggregation.Type, |
| aggBuckets: vd.View.Aggregation.Buckets, |
| desc: vd.View.Description, |
| tagKeys: vd.View.TagKeys, |
| rows: vd.Rows, |
| } |
| } |
| |
| // compareRows compares rows with respect to the information desired to test. |
| // Both the tags representing the rows and also the data of the row are tested |
| // for equality. Rows are in nondeterministic order when ExportView is called, |
| // but handled inside this function by sorting. |
| func compareRows(rows []*view.Row, rows2 []*view.Row) bool { |
| if len(rows) != len(rows2) { |
| return false |
| } |
| // Sort both rows according to the same rule. This is to take away non |
| // determinism in the row ordering passed to the Exporter, while keeping the |
| // row data. |
| sort.Slice(rows, func(i, j int) bool { |
| return rows[i].String() > rows[j].String() |
| }) |
| |
| sort.Slice(rows2, func(i, j int) bool { |
| return rows2[i].String() > rows2[j].String() |
| }) |
| |
| for i, row := range rows { |
| if !cmp.Equal(row.Tags, rows2[i].Tags, cmp.Comparer(func(a tag.Key, b tag.Key) bool { |
| return a.Name() == b.Name() |
| })) { |
| return false |
| } |
| if !compareData(row.Data, rows2[i].Data) { |
| return false |
| } |
| } |
| return true |
| } |
| |
| // compareData returns whether the two aggregation data's are equal to each |
| // other with respect to parts of the data desired for correct emission. The |
| // function first makes sure the two types of aggregation data are the same, and |
| // then checks the equality for the respective aggregation data type. |
| func compareData(ad view.AggregationData, ad2 view.AggregationData) bool { |
| if ad == nil && ad2 == nil { |
| return true |
| } |
| if ad == nil || ad2 == nil { |
| return false |
| } |
| if reflect.TypeOf(ad) != reflect.TypeOf(ad2) { |
| return false |
| } |
| switch ad1 := ad.(type) { |
| case *view.DistributionData: |
| dd2 := ad2.(*view.DistributionData) |
| // Count and Count Per Buckets are reasonable for correctness, |
| // especially since we verify equality of bucket endpoints elsewhere. |
| if ad1.Count != dd2.Count { |
| return false |
| } |
| for i, count := range ad1.CountPerBucket { |
| if count != dd2.CountPerBucket[i] { |
| return false |
| } |
| } |
| case *view.CountData: |
| cd2 := ad2.(*view.CountData) |
| return ad1.Value == cd2.Value |
| |
| // gRPC open census plugin does not have these next two types of aggregation |
| // data types present, for now just check for type equality between the two |
| // aggregation data points (done above). |
| // case *view.SumData |
| // case *view.LastValueData: |
| } |
| return true |
| } |
| |
| func (vi *viewInformation) Equal(vi2 *viewInformation) bool { |
| if vi == nil && vi2 == nil { |
| return true |
| } |
| if vi == nil || vi2 == nil { |
| return false |
| } |
| if vi.aggType != vi2.aggType { |
| return false |
| } |
| if !cmp.Equal(vi.aggBuckets, vi2.aggBuckets) { |
| return false |
| } |
| if vi.desc != vi2.desc { |
| return false |
| } |
| if !cmp.Equal(vi.tagKeys, vi2.tagKeys, cmp.Comparer(func(a tag.Key, b tag.Key) bool { |
| return a.Name() == b.Name() |
| })) { |
| return false |
| } |
| if !compareRows(vi.rows, vi2.rows) { |
| return false |
| } |
| return true |
| } |
| |
| // distributionDataLatencyCount checks if the view information contains the |
| // desired distrubtion latency total count that falls in buckets of 5 seconds or |
| // less. This must be called with non nil view information that is aggregated |
| // with distribution data. Returns a nil error if correct count information |
| // found, non nil error if correct information not found. |
| func distributionDataLatencyCount(vi *viewInformation, countWant int64) error { |
| var totalCount int64 |
| var largestIndexWithFive int |
| for i, bucket := range vi.aggBuckets { |
| // Distribution for latency is measured in milliseconds, so 5 * 1000 = |
| // 5000. |
| if bucket > 5000 { |
| largestIndexWithFive = i |
| break |
| } |
| } |
| // Iterating through rows sums up data points for all methods. In this case, |
| // a data point for the unary and for the streaming RPC. |
| for _, row := range vi.rows { |
| // This could potentially have an extra measurement in buckets above 5s, |
| // but that's fine. Count of buckets that could contain up to 5s is a |
| // good enough assertion. |
| for i, count := range row.Data.(*view.DistributionData).CountPerBucket { |
| if i >= largestIndexWithFive { |
| break |
| } |
| totalCount = totalCount + count |
| } |
| } |
| if totalCount != countWant { |
| return fmt.Errorf("wrong total count for counts under 5: %v, wantCount: %v", totalCount, countWant) |
| } |
| return nil |
| } |
| |
| // TestAllMetricsOneFunction tests emitted metrics from gRPC. It registers all |
| // the metrics provided by this package. It then configures a system with a gRPC |
| // Client and gRPC server with the OpenCensus Dial and Server Option configured, |
| // and makes a Unary RPC and a Streaming RPC. These two RPCs should cause |
| // certain emissions for each registered metric through the OpenCensus View |
| // package. |
| func (s) TestAllMetricsOneFunction(t *testing.T) { |
| allViews := []*view.View{ |
| ClientStartedRPCsView, |
| ServerStartedRPCsView, |
| ClientCompletedRPCsView, |
| ServerCompletedRPCsView, |
| ClientSentBytesPerRPCView, |
| ClientSentCompressedMessageBytesPerRPCView, |
| ServerSentBytesPerRPCView, |
| ServerSentCompressedMessageBytesPerRPCView, |
| ClientReceivedBytesPerRPCView, |
| ClientReceivedCompressedMessageBytesPerRPCView, |
| ServerReceivedBytesPerRPCView, |
| ServerReceivedCompressedMessageBytesPerRPCView, |
| ClientSentMessagesPerRPCView, |
| ServerSentMessagesPerRPCView, |
| ClientReceivedMessagesPerRPCView, |
| ServerReceivedMessagesPerRPCView, |
| ClientRoundtripLatencyView, |
| ServerLatencyView, |
| ClientAPILatencyView, |
| } |
| view.Register(allViews...) |
| // Unregister unconditionally in this defer to correctly cleanup globals in |
| // error conditions. |
| defer view.Unregister(allViews...) |
| fe := &fakeExporter{ |
| t: t, |
| seenViews: make(map[string]*viewInformation), |
| } |
| view.RegisterExporter(fe) |
| defer view.UnregisterExporter(fe) |
| |
| ss := &stubserver.StubServer{ |
| UnaryCallF: func(ctx context.Context, in *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { |
| return &grpc_testing.SimpleResponse{Payload: &grpc_testing.Payload{ |
| Body: make([]byte, 10000), |
| }}, nil |
| }, |
| FullDuplexCallF: func(stream grpc_testing.TestService_FullDuplexCallServer) error { |
| for { |
| _, err := stream.Recv() |
| if err == io.EOF { |
| return nil |
| } |
| } |
| }, |
| } |
| if err := ss.Start([]grpc.ServerOption{ServerOption(TraceOptions{})}, DialOption(TraceOptions{})); err != nil { |
| t.Fatalf("Error starting endpoint server: %v", err) |
| } |
| defer ss.Stop() |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| // Make two RPC's, a unary RPC and a streaming RPC. These should cause |
| // certain metrics to be emitted. |
| if _, err := ss.Client.UnaryCall(ctx, &grpc_testing.SimpleRequest{Payload: &grpc_testing.Payload{ |
| Body: make([]byte, 10000), |
| }}, grpc.UseCompressor(gzip.Name)); err != nil { |
| t.Fatalf("Unexpected error from UnaryCall: %v", err) |
| } |
| stream, err := ss.Client.FullDuplexCall(ctx) |
| if err != nil { |
| t.Fatalf("ss.Client.FullDuplexCall failed: %f", err) |
| } |
| |
| stream.CloseSend() |
| if _, err = stream.Recv(); err != io.EOF { |
| t.Fatalf("unexpected error: %v, expected an EOF error", err) |
| } |
| |
| cmtk := tag.MustNewKey("grpc_client_method") |
| smtk := tag.MustNewKey("grpc_server_method") |
| cstk := tag.MustNewKey("grpc_client_status") |
| sstk := tag.MustNewKey("grpc_server_status") |
| wantMetrics := []struct { |
| metric *view.View |
| wantVI *viewInformation |
| }{ |
| { |
| metric: ClientStartedRPCsView, |
| wantVI: &viewInformation{ |
| aggType: view.AggTypeCount, |
| aggBuckets: []float64{}, |
| desc: "Number of opened client RPCs, by method.", |
| tagKeys: []tag.Key{ |
| cmtk, |
| }, |
| |
| rows: []*view.Row{ |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: cmtk, |
| Value: "grpc.testing.TestService/UnaryCall", |
| }, |
| }, |
| Data: &view.CountData{ |
| Value: 1, |
| }, |
| }, |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: cmtk, |
| Value: "grpc.testing.TestService/FullDuplexCall", |
| }, |
| }, |
| Data: &view.CountData{ |
| Value: 1, |
| }, |
| }, |
| }, |
| }, |
| }, |
| { |
| metric: ServerStartedRPCsView, |
| wantVI: &viewInformation{ |
| aggType: view.AggTypeCount, |
| aggBuckets: []float64{}, |
| desc: "Number of opened server RPCs, by method.", |
| tagKeys: []tag.Key{ |
| smtk, |
| }, |
| rows: []*view.Row{ |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: smtk, |
| Value: "grpc.testing.TestService/UnaryCall", |
| }, |
| }, |
| Data: &view.CountData{ |
| Value: 1, |
| }, |
| }, |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: smtk, |
| Value: "grpc.testing.TestService/FullDuplexCall", |
| }, |
| }, |
| Data: &view.CountData{ |
| Value: 1, |
| }, |
| }, |
| }, |
| }, |
| }, |
| { |
| metric: ClientCompletedRPCsView, |
| wantVI: &viewInformation{ |
| aggType: view.AggTypeCount, |
| aggBuckets: []float64{}, |
| desc: "Number of completed RPCs by method and status.", |
| tagKeys: []tag.Key{ |
| cmtk, |
| cstk, |
| }, |
| rows: []*view.Row{ |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: cmtk, |
| Value: "grpc.testing.TestService/UnaryCall", |
| }, |
| { |
| Key: cstk, |
| Value: "OK", |
| }, |
| }, |
| Data: &view.CountData{ |
| Value: 1, |
| }, |
| }, |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: cmtk, |
| Value: "grpc.testing.TestService/FullDuplexCall", |
| }, |
| { |
| Key: cstk, |
| Value: "OK", |
| }, |
| }, |
| Data: &view.CountData{ |
| Value: 1, |
| }, |
| }, |
| }, |
| }, |
| }, |
| { |
| metric: ServerCompletedRPCsView, |
| wantVI: &viewInformation{ |
| aggType: view.AggTypeCount, |
| aggBuckets: []float64{}, |
| desc: "Number of completed RPCs by method and status.", |
| tagKeys: []tag.Key{ |
| smtk, |
| sstk, |
| }, |
| rows: []*view.Row{ |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: smtk, |
| Value: "grpc.testing.TestService/UnaryCall", |
| }, |
| { |
| Key: sstk, |
| Value: "OK", |
| }, |
| }, |
| Data: &view.CountData{ |
| Value: 1, |
| }, |
| }, |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: smtk, |
| Value: "grpc.testing.TestService/FullDuplexCall", |
| }, |
| { |
| Key: sstk, |
| Value: "OK", |
| }, |
| }, |
| Data: &view.CountData{ |
| Value: 1, |
| }, |
| }, |
| }, |
| }, |
| }, |
| { |
| metric: ClientSentBytesPerRPCView, |
| wantVI: &viewInformation{ |
| aggType: view.AggTypeDistribution, |
| aggBuckets: bytesDistributionBounds, |
| desc: "Distribution of sent bytes per RPC, by method.", |
| tagKeys: []tag.Key{ |
| cmtk, |
| }, |
| rows: []*view.Row{ |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: cmtk, |
| Value: "grpc.testing.TestService/UnaryCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: cmtk, |
| Value: "grpc.testing.TestService/FullDuplexCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| }, |
| }, |
| }, |
| { |
| metric: ClientSentCompressedMessageBytesPerRPCView, |
| wantVI: &viewInformation{ |
| aggType: view.AggTypeDistribution, |
| aggBuckets: bytesDistributionBounds, |
| desc: "Distribution of sent compressed message bytes per RPC, by method.", |
| tagKeys: []tag.Key{ |
| cmtk, |
| }, |
| rows: []*view.Row{ |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: cmtk, |
| Value: "grpc.testing.TestService/UnaryCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: cmtk, |
| Value: "grpc.testing.TestService/FullDuplexCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| }, |
| }, |
| }, |
| { |
| metric: ServerSentBytesPerRPCView, |
| wantVI: &viewInformation{ |
| aggType: view.AggTypeDistribution, |
| aggBuckets: bytesDistributionBounds, |
| desc: "Distribution of sent bytes per RPC, by method.", |
| tagKeys: []tag.Key{ |
| smtk, |
| }, |
| rows: []*view.Row{ |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: smtk, |
| Value: "grpc.testing.TestService/UnaryCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: smtk, |
| Value: "grpc.testing.TestService/FullDuplexCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| }, |
| }, |
| }, |
| { |
| metric: ServerSentCompressedMessageBytesPerRPCView, |
| wantVI: &viewInformation{ |
| aggType: view.AggTypeDistribution, |
| aggBuckets: bytesDistributionBounds, |
| desc: "Distribution of sent compressed message bytes per RPC, by method.", |
| tagKeys: []tag.Key{ |
| smtk, |
| }, |
| rows: []*view.Row{ |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: smtk, |
| Value: "grpc.testing.TestService/UnaryCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: smtk, |
| Value: "grpc.testing.TestService/FullDuplexCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| }, |
| }, |
| }, |
| { |
| metric: ClientReceivedBytesPerRPCView, |
| wantVI: &viewInformation{ |
| aggType: view.AggTypeDistribution, |
| aggBuckets: bytesDistributionBounds, |
| desc: "Distribution of received bytes per RPC, by method.", |
| tagKeys: []tag.Key{ |
| cmtk, |
| }, |
| rows: []*view.Row{ |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: cmtk, |
| Value: "grpc.testing.TestService/UnaryCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: cmtk, |
| Value: "grpc.testing.TestService/FullDuplexCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| }, |
| }, |
| }, |
| { |
| metric: ClientReceivedCompressedMessageBytesPerRPCView, |
| wantVI: &viewInformation{ |
| aggType: view.AggTypeDistribution, |
| aggBuckets: bytesDistributionBounds, |
| desc: "Distribution of received compressed message bytes per RPC, by method.", |
| tagKeys: []tag.Key{ |
| cmtk, |
| }, |
| rows: []*view.Row{ |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: cmtk, |
| Value: "grpc.testing.TestService/UnaryCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: cmtk, |
| Value: "grpc.testing.TestService/FullDuplexCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| }, |
| }, |
| }, |
| { |
| metric: ServerReceivedBytesPerRPCView, |
| wantVI: &viewInformation{ |
| aggType: view.AggTypeDistribution, |
| aggBuckets: bytesDistributionBounds, |
| desc: "Distribution of received bytes per RPC, by method.", |
| tagKeys: []tag.Key{ |
| smtk, |
| }, |
| rows: []*view.Row{ |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: smtk, |
| Value: "grpc.testing.TestService/UnaryCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: smtk, |
| Value: "grpc.testing.TestService/FullDuplexCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| }, |
| }, |
| }, |
| { |
| metric: ServerReceivedCompressedMessageBytesPerRPCView, |
| wantVI: &viewInformation{ |
| aggType: view.AggTypeDistribution, |
| aggBuckets: bytesDistributionBounds, |
| desc: "Distribution of received compressed message bytes per RPC, by method.", |
| tagKeys: []tag.Key{ |
| smtk, |
| }, |
| rows: []*view.Row{ |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: smtk, |
| Value: "grpc.testing.TestService/UnaryCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: smtk, |
| Value: "grpc.testing.TestService/FullDuplexCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| }, |
| }, |
| }, |
| { |
| metric: ClientSentMessagesPerRPCView, |
| wantVI: &viewInformation{ |
| aggType: view.AggTypeDistribution, |
| aggBuckets: countDistributionBounds, |
| desc: "Distribution of sent messages per RPC, by method.", |
| tagKeys: []tag.Key{ |
| cmtk, |
| }, |
| rows: []*view.Row{ |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: cmtk, |
| Value: "grpc.testing.TestService/UnaryCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: cmtk, |
| Value: "grpc.testing.TestService/FullDuplexCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| }, |
| }, |
| }, |
| { |
| metric: ServerSentMessagesPerRPCView, |
| wantVI: &viewInformation{ |
| aggType: view.AggTypeDistribution, |
| aggBuckets: countDistributionBounds, |
| desc: "Distribution of sent messages per RPC, by method.", |
| tagKeys: []tag.Key{ |
| smtk, |
| }, |
| rows: []*view.Row{ |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: smtk, |
| Value: "grpc.testing.TestService/UnaryCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: smtk, |
| Value: "grpc.testing.TestService/FullDuplexCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| }, |
| }, |
| }, |
| { |
| metric: ClientReceivedMessagesPerRPCView, |
| wantVI: &viewInformation{ |
| aggType: view.AggTypeDistribution, |
| aggBuckets: countDistributionBounds, |
| desc: "Distribution of received messages per RPC, by method.", |
| tagKeys: []tag.Key{ |
| cmtk, |
| }, |
| rows: []*view.Row{ |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: cmtk, |
| Value: "grpc.testing.TestService/UnaryCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: cmtk, |
| Value: "grpc.testing.TestService/FullDuplexCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| }, |
| }, |
| }, |
| { |
| metric: ServerReceivedMessagesPerRPCView, |
| wantVI: &viewInformation{ |
| aggType: view.AggTypeDistribution, |
| aggBuckets: countDistributionBounds, |
| desc: "Distribution of received messages per RPC, by method.", |
| tagKeys: []tag.Key{ |
| smtk, |
| }, |
| rows: []*view.Row{ |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: smtk, |
| Value: "grpc.testing.TestService/UnaryCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| { |
| Tags: []tag.Tag{ |
| { |
| Key: smtk, |
| Value: "grpc.testing.TestService/FullDuplexCall", |
| }, |
| }, |
| Data: &view.DistributionData{ |
| Count: 1, |
| CountPerBucket: []int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, |
| }, |
| }, |
| }, |
| }, |
| }, |
| { |
| metric: ClientRoundtripLatencyView, |
| }, |
| { |
| metric: ServerLatencyView, |
| }, |
| // Per call metrics: |
| { |
| metric: ClientAPILatencyView, |
| }, |
| } |
| // Unregister all the views. Unregistering a view causes a synchronous |
| // upload of any collected data for the view to any registered exporters. |
| // Thus, after this unregister call, the exporter has the data to make |
| // assertions on immediately. |
| view.Unregister(allViews...) |
| // Assert the expected emissions for each metric match the expected |
| // emissions. |
| for _, wantMetric := range wantMetrics { |
| metricName := wantMetric.metric.Name |
| var vi *viewInformation |
| if vi = fe.seenViews[metricName]; vi == nil { |
| t.Fatalf("couldn't find %v in the views exported, never collected", metricName) |
| } |
| |
| // For latency metrics, there is a lot of non determinism about |
| // the exact milliseconds of RPCs that finish. Thus, rather than |
| // declare the exact data you want, make sure the latency |
| // measurement points for the two RPCs above fall within buckets |
| // that fall into less than 5 seconds, which is the rpc timeout. |
| if metricName == "grpc.io/client/roundtrip_latency" || metricName == "grpc.io/server/server_latency" || metricName == "grpc.io/client/api_latency" { |
| // RPCs have a context timeout of 5s, so all the recorded |
| // measurements (one per RPC - two total) should fall within 5 |
| // second buckets. |
| if err := distributionDataLatencyCount(vi, 2); err != nil { |
| t.Fatalf("Invalid OpenCensus export view data for metric %v: %v", metricName, err) |
| } |
| continue |
| } |
| if diff := cmp.Diff(vi, wantMetric.wantVI); diff != "" { |
| t.Fatalf("got unexpected viewInformation for metric %v, diff (-got, +want): %v", metricName, diff) |
| } |
| // Note that this test only fatals with one error if a metric fails. |
| // This is fine, as all are expected to pass so if a single one fails |
| // you can figure it out and iterate as needed. |
| } |
| } |
| |
| // TestOpenCensusTags tests this instrumentation code's ability to propagate |
| // OpenCensus tags across the wire. It also tests the server stats handler's |
| // functionality of adding the server method tag for the application to see. The |
| // test makes an Unary RPC without a tag map and with a tag map, and expects to |
| // see a tag map at the application layer with server method tag in the first |
| // case, and a tag map at the application layer with the populated tag map plus |
| // server method tag in second case. |
| func (s) TestOpenCensusTags(t *testing.T) { |
| // This stub servers functions represent the application layer server side. |
| // This is the intended feature being tested: that open census tags |
| // populated at the client side application layer end up at the server side |
| // application layer with the server method tag key in addition to the map |
| // populated at the client side application layer if populated. |
| tmCh := testutils.NewChannel() |
| ss := &stubserver.StubServer{ |
| UnaryCallF: func(ctx context.Context, in *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { |
| // Do the sends of the tag maps for assertions in this main testing |
| // goroutine. Do the receives and assertions in a forked goroutine. |
| if tm := tag.FromContext(ctx); tm != nil { |
| tmCh.Send(tm) |
| } else { |
| tmCh.Send(errors.New("no tag map received server side")) |
| } |
| return &grpc_testing.SimpleResponse{}, nil |
| }, |
| } |
| if err := ss.Start([]grpc.ServerOption{ServerOption(TraceOptions{})}, DialOption(TraceOptions{})); err != nil { |
| t.Fatalf("Error starting endpoint server: %v", err) |
| } |
| defer ss.Stop() |
| |
| key1 := tag.MustNewKey("key 1") |
| wg := sync.WaitGroup{} |
| wg.Add(1) |
| readerErrCh := testutils.NewChannel() |
| // Spawn a goroutine to receive and validation two tag maps received by the |
| // server application code. |
| go func() { |
| defer wg.Done() |
| unaryCallMethodName := "grpc.testing.TestService/UnaryCall" |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| // Attempt to receive the tag map from the first RPC. |
| if tm, err := tmCh.Receive(ctx); err == nil { |
| tagMap, ok := tm.(*tag.Map) |
| // Shouldn't happen, this test sends only *tag.Map type on channel. |
| if !ok { |
| readerErrCh.Send(fmt.Errorf("received wrong type from channel: %T", tm)) |
| } |
| // keyServerMethod should be present in this tag map received server |
| // side. |
| val, ok := tagMap.Value(keyServerMethod) |
| if !ok { |
| readerErrCh.Send(fmt.Errorf("no key: %v present in OpenCensus tag map", keyServerMethod.Name())) |
| } |
| if val != unaryCallMethodName { |
| readerErrCh.Send(fmt.Errorf("serverMethod receieved: %v, want server method: %v", val, unaryCallMethodName)) |
| } |
| } else { |
| readerErrCh.Send(fmt.Errorf("error while waiting for a tag map: %v", err)) |
| } |
| readerErrCh.Send(nil) |
| |
| // Attempt to receive the tag map from the second RPC. |
| if tm, err := tmCh.Receive(ctx); err == nil { |
| tagMap, ok := tm.(*tag.Map) |
| // Shouldn't happen, this test sends only *tag.Map type on channel. |
| if !ok { |
| readerErrCh.Send(fmt.Errorf("received wrong type from channel: %T", tm)) |
| } |
| // key1: "value1" populated in the tag map client side should make |
| // its way to server. |
| val, ok := tagMap.Value(key1) |
| if !ok { |
| readerErrCh.Send(fmt.Errorf("no key: %v present in OpenCensus tag map", key1.Name())) |
| } |
| if val != "value1" { |
| readerErrCh.Send(fmt.Errorf("key %v received: %v, want server method: %v", key1.Name(), val, unaryCallMethodName)) |
| } |
| // keyServerMethod should be appended to tag map as well. |
| val, ok = tagMap.Value(keyServerMethod) |
| if !ok { |
| readerErrCh.Send(fmt.Errorf("no key: %v present in OpenCensus tag map", keyServerMethod.Name())) |
| } |
| if val != unaryCallMethodName { |
| readerErrCh.Send(fmt.Errorf("key: %v received: %v, want server method: %v", keyServerMethod.Name(), val, unaryCallMethodName)) |
| } |
| } else { |
| readerErrCh.Send(fmt.Errorf("error while waiting for second tag map: %v", err)) |
| } |
| readerErrCh.Send(nil) |
| }() |
| |
| // Make a unary RPC without populating an OpenCensus tag map. The server |
| // side should receive an OpenCensus tag map containing only the |
| // keyServerMethod. |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if _, err := ss.Client.UnaryCall(ctx, &grpc_testing.SimpleRequest{Payload: &grpc_testing.Payload{}}); err != nil { |
| t.Fatalf("Unexpected error from UnaryCall: %v", err) |
| } |
| |
| ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| // Should receive a nil error from the readerErrCh, meaning the reader |
| // goroutine successfully received a tag map with the keyServerMethod |
| // populated. |
| if chErr, err := readerErrCh.Receive(ctx); chErr != nil || err != nil { |
| if err != nil { |
| t.Fatalf("Should have received something from error channel: %v", err) |
| } |
| if chErr != nil { |
| t.Fatalf("Should have received a nil error from channel, instead received: %v", chErr) |
| } |
| } |
| |
| tm := &tag.Map{} |
| ctx = tag.NewContext(ctx, tm) |
| ctx, err := tag.New(ctx, tag.Upsert(key1, "value1")) |
| // Setup steps like this can fatal, so easier to do the RPC's and subsequent |
| // sends of the tag maps of the RPC's in main goroutine and have the |
| // corresponding receives and assertions in a forked goroutine. |
| if err != nil { |
| t.Fatalf("Error creating tag map: %v", err) |
| } |
| // Make a unary RPC with a populated OpenCensus tag map. The server side |
| // should receive an OpenCensus tag map containing this populated tag map |
| // with the keyServerMethod tag appended to it. |
| if _, err := ss.Client.UnaryCall(ctx, &grpc_testing.SimpleRequest{Payload: &grpc_testing.Payload{}}); err != nil { |
| t.Fatalf("Unexpected error from UnaryCall: %v", err) |
| } |
| if chErr, err := readerErrCh.Receive(ctx); chErr != nil || err != nil { |
| if err != nil { |
| t.Fatalf("Should have received something from error channel: %v", err) |
| } |
| if chErr != nil { |
| t.Fatalf("Should have received a nil error from channel, instead received: %v", chErr) |
| } |
| } |
| wg.Wait() |
| } |
| |
| // compareSpanContext only checks the equality of the trace options, which |
| // represent whether the span should be sampled. The other fields are checked |
| // for presence in later assertions. |
| func compareSpanContext(sc trace.SpanContext, sc2 trace.SpanContext) bool { |
| return sc.TraceOptions.IsSampled() == sc2.TraceOptions.IsSampled() |
| } |
| |
| func compareMessageEvents(me []trace.MessageEvent, me2 []trace.MessageEvent) bool { |
| if len(me) != len(me2) { |
| return false |
| } |
| // Order matters here, message events are deterministic so no flakiness to |
| // test. |
| for i, e := range me { |
| e2 := me2[i] |
| if e.EventType != e2.EventType { |
| return false |
| } |
| if e.MessageID != e2.MessageID { |
| return false |
| } |
| if e.UncompressedByteSize != e2.UncompressedByteSize { |
| return false |
| } |
| if e.CompressedByteSize != e2.CompressedByteSize { |
| return false |
| } |
| } |
| return true |
| } |
| |
| // compareLinks compares the type of link received compared to the wanted link. |
| func compareLinks(ls []trace.Link, ls2 []trace.Link) bool { |
| if len(ls) != len(ls2) { |
| return false |
| } |
| for i, l := range ls { |
| l2 := ls2[i] |
| if l.Type != l2.Type { |
| return false |
| } |
| } |
| return true |
| } |
| |
| // spanInformation is the information received about the span. This is a subset |
| // of information that is important to verify that gRPC has knobs over, which |
| // goes through a stable OpenCensus API with well defined behavior. This keeps |
| // the robustness of assertions over time. |
| type spanInformation struct { |
| // SpanContext either gets pulled off the wire in certain cases server side |
| // or created. |
| sc trace.SpanContext |
| parentSpanID trace.SpanID |
| spanKind int |
| name string |
| message string |
| messageEvents []trace.MessageEvent |
| status trace.Status |
| links []trace.Link |
| hasRemoteParent bool |
| childSpanCount int |
| } |
| |
| // validateTraceAndSpanIDs checks for consistent trace ID across the full trace. |
| // It also asserts each span has a corresponding generated SpanID, and makes |
| // sure in the case of a server span and a client span, the server span points |
| // to the client span as its parent. This is assumed to be called with spans |
| // from the same RPC (thus the same trace). If called with spanInformation slice |
| // of length 2, it assumes first span is a server span which points to second |
| // span as parent and second span is a client span. These assertions are |
| // orthogonal to pure equality assertions, as this data is generated at runtime, |
| // so can only test relations between IDs (i.e. this part of the data has the |
| // same ID as this part of the data). |
| // |
| // Returns an error in the case of a failing assertion, non nil error otherwise. |
| func validateTraceAndSpanIDs(sis []spanInformation) error { |
| var traceID trace.TraceID |
| for i, si := range sis { |
| // Trace IDs should all be consistent across every span, since this |
| // function assumes called with Span from one RPC, which all fall under |
| // one trace. |
| if i == 0 { |
| traceID = si.sc.TraceID |
| } else { |
| if !cmp.Equal(si.sc.TraceID, traceID) { |
| return fmt.Errorf("TraceIDs should all be consistent: %v, %v", si.sc.TraceID, traceID) |
| } |
| } |
| // Due to the span IDs being 8 bytes, the documentation states that it |
| // is practically a mathematical uncertainty in practice to create two |
| // colliding IDs. Thus, for a presence check (the ID was actually |
| // generated, I will simply compare to the zero value, even though a |
| // zero value is a theoretical possibility of generation). This is |
| // because in practice, this zero value defined by this test will never |
| // collide with the generated ID. |
| if cmp.Equal(si.sc.SpanID, trace.SpanID{}) { |
| return errors.New("span IDs should be populated from the creation of the span") |
| } |
| } |
| // If the length of spans of an RPC is 2, it means there is a server span |
| // which exports first and a client span which exports second. Thus, the |
| // server span should point to the client span as its parent, represented |
| // by its ID. |
| if len(sis) == 2 { |
| if !cmp.Equal(sis[0].parentSpanID, sis[1].sc.SpanID) { |
| return fmt.Errorf("server span should point to the client span as its parent. parentSpanID: %v, clientSpanID: %v", sis[0].parentSpanID, sis[1].sc.SpanID) |
| } |
| } |
| return nil |
| } |
| |
| // Equal compares the constant data of the exported span information that is |
| // important for correctness known before runtime. |
| func (si spanInformation) Equal(si2 spanInformation) bool { |
| if !compareSpanContext(si.sc, si2.sc) { |
| return false |
| } |
| |
| if si.spanKind != si2.spanKind { |
| return false |
| } |
| if si.name != si2.name { |
| return false |
| } |
| if si.message != si2.message { |
| return false |
| } |
| // Ignore attribute comparison because Java doesn't even populate any so not |
| // important for correctness. |
| if !compareMessageEvents(si.messageEvents, si2.messageEvents) { |
| return false |
| } |
| if !cmp.Equal(si.status, si2.status) { |
| return false |
| } |
| // compare link type as link type child is important. |
| if !compareLinks(si.links, si2.links) { |
| return false |
| } |
| if si.hasRemoteParent != si2.hasRemoteParent { |
| return false |
| } |
| return si.childSpanCount == si2.childSpanCount |
| } |
| |
| func (fe *fakeExporter) ExportSpan(sd *trace.SpanData) { |
| fe.mu.Lock() |
| defer fe.mu.Unlock() |
| |
| // Persist the subset of data received that is important for correctness and |
| // to make various assertions on later. Keep the ordering as ordering of |
| // spans is deterministic in the context of one RPC. |
| gotSI := spanInformation{ |
| sc: sd.SpanContext, |
| parentSpanID: sd.ParentSpanID, |
| spanKind: sd.SpanKind, |
| name: sd.Name, |
| message: sd.Message, |
| // annotations - ignore |
| // attributes - ignore, I just left them in from previous but no spec |
| // for correctness so no need to test. Java doesn't even have any |
| // attributes. |
| messageEvents: sd.MessageEvents, |
| status: sd.Status, |
| links: sd.Links, |
| hasRemoteParent: sd.HasRemoteParent, |
| childSpanCount: sd.ChildSpanCount, |
| } |
| fe.seenSpans = append(fe.seenSpans, gotSI) |
| } |
| |
| // TestSpan tests emitted spans from gRPC. It configures a system with a gRPC |
| // Client and gRPC server with the OpenCensus Dial and Server Option configured, |
| // and makes a Unary RPC and a Streaming RPC. This should cause spans with |
| // certain information to be emitted from client and server side for each RPC. |
| func (s) TestSpan(t *testing.T) { |
| fe := &fakeExporter{ |
| t: t, |
| } |
| trace.RegisterExporter(fe) |
| defer trace.UnregisterExporter(fe) |
| |
| so := TraceOptions{ |
| TS: trace.ProbabilitySampler(1), |
| DisableTrace: false, |
| } |
| ss := &stubserver.StubServer{ |
| UnaryCallF: func(ctx context.Context, in *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { |
| return &grpc_testing.SimpleResponse{}, nil |
| }, |
| FullDuplexCallF: func(stream grpc_testing.TestService_FullDuplexCallServer) error { |
| for { |
| _, err := stream.Recv() |
| if err == io.EOF { |
| return nil |
| } |
| } |
| }, |
| } |
| if err := ss.Start([]grpc.ServerOption{ServerOption(so)}, DialOption(so)); err != nil { |
| t.Fatalf("Error starting endpoint server: %v", err) |
| } |
| defer ss.Stop() |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| |
| // Make a Unary RPC. This should cause a span with message events |
| // corresponding to the request message and response message to be emitted |
| // both from the client and the server. Note that RPCs trigger exports of |
| // corresponding span data synchronously, thus the Span Data is guaranteed |
| // to have been read by exporter and is ready to make assertions on. |
| if _, err := ss.Client.UnaryCall(ctx, &grpc_testing.SimpleRequest{Payload: &grpc_testing.Payload{}}); err != nil { |
| t.Fatalf("Unexpected error from UnaryCall: %v", err) |
| } |
| |
| // The spans received are server first, then client. This is due to the RPC |
| // finishing on the server first. The ordering of message events for a Unary |
| // Call is as follows: (client send, server recv), (server send (server span |
| // end), client recv (client span end)). |
| wantSI := []spanInformation{ |
| { |
| // Sampling rate of 100 percent, so this should populate every span |
| // with the information that this span is being sampled. Here and |
| // every other span emitted in this test. |
| sc: trace.SpanContext{ |
| TraceOptions: 1, |
| }, |
| spanKind: trace.SpanKindServer, |
| name: "grpc.testing.TestService.UnaryCall", |
| // message id - "must be calculated as two different counters |
| // starting from 1 one for sent messages and one for received |
| // message. This way we guarantee that the values will be consistent |
| // between different implementations. In case of unary calls only |
| // one sent and one received message will be recorded for both |
| // client and server spans." |
| messageEvents: []trace.MessageEvent{ |
| { |
| EventType: trace.MessageEventTypeRecv, |
| MessageID: 1, // First msg recv so 1 (see comment above) |
| UncompressedByteSize: 2, |
| CompressedByteSize: 2, |
| }, |
| { |
| EventType: trace.MessageEventTypeSent, |
| MessageID: 1, // First msg send so 1 (see comment above) |
| }, |
| }, |
| links: []trace.Link{ |
| { |
| Type: trace.LinkTypeChild, |
| }, |
| }, |
| // For some reason, status isn't populated in the data sent to the |
| // exporter. This seems wrong, but it didn't send status in old |
| // instrumentation code, so I'm iffy on it but fine. |
| hasRemoteParent: true, |
| }, |
| { |
| sc: trace.SpanContext{ |
| TraceOptions: 1, |
| }, |
| spanKind: trace.SpanKindClient, |
| name: "Attempt.grpc.testing.TestService.UnaryCall", |
| messageEvents: []trace.MessageEvent{ |
| { |
| EventType: trace.MessageEventTypeSent, |
| MessageID: 1, // First msg send so 1 (see comment above) |
| UncompressedByteSize: 2, |
| CompressedByteSize: 2, |
| }, |
| { |
| EventType: trace.MessageEventTypeRecv, |
| MessageID: 1, // First msg recv so 1 (see comment above) |
| }, |
| }, |
| hasRemoteParent: false, |
| }, |
| { |
| sc: trace.SpanContext{ |
| TraceOptions: 1, |
| }, |
| spanKind: trace.SpanKindClient, |
| name: "Sent.grpc.testing.TestService.UnaryCall", |
| hasRemoteParent: false, |
| childSpanCount: 1, |
| }, |
| } |
| if diff := cmp.Diff(fe.seenSpans, wantSI); diff != "" { |
| t.Fatalf("got unexpected spans, diff (-got, +want): %v", diff) |
| } |
| fe.mu.Lock() |
| if err := validateTraceAndSpanIDs(fe.seenSpans); err != nil { |
| fe.mu.Unlock() |
| t.Fatalf("Error in runtime data assertions: %v", err) |
| } |
| if !cmp.Equal(fe.seenSpans[0].parentSpanID, fe.seenSpans[1].sc.SpanID) { |
| t.Fatalf("server span should point to the client attempt span as its parent. parentSpanID: %v, clientAttemptSpanID: %v", fe.seenSpans[0].parentSpanID, fe.seenSpans[1].sc.SpanID) |
| } |
| if !cmp.Equal(fe.seenSpans[1].parentSpanID, fe.seenSpans[2].sc.SpanID) { |
| t.Fatalf("client attempt span should point to the client call span as its parent. parentSpanID: %v, clientCallSpanID: %v", fe.seenSpans[1].parentSpanID, fe.seenSpans[2].sc.SpanID) |
| } |
| |
| fe.seenSpans = nil |
| fe.mu.Unlock() |
| |
| stream, err := ss.Client.FullDuplexCall(ctx) |
| if err != nil { |
| t.Fatalf("ss.Client.FullDuplexCall failed: %v", err) |
| } |
| // Send two messages. This should be recorded in the emitted spans message |
| // events, with message IDs which increase for each message. |
| if err := stream.Send(&grpc_testing.StreamingOutputCallRequest{}); err != nil { |
| t.Fatalf("stream.Send failed: %v", err) |
| } |
| if err := stream.Send(&grpc_testing.StreamingOutputCallRequest{}); err != nil { |
| t.Fatalf("stream.Send failed: %v", err) |
| } |
| |
| stream.CloseSend() |
| if _, err = stream.Recv(); err != io.EOF { |
| t.Fatalf("unexpected error: %v, expected an EOF error", err) |
| } |
| |
| wantSI = []spanInformation{ |
| { |
| sc: trace.SpanContext{ |
| TraceOptions: 1, |
| }, |
| spanKind: trace.SpanKindServer, |
| name: "grpc.testing.TestService.FullDuplexCall", |
| links: []trace.Link{ |
| { |
| Type: trace.LinkTypeChild, |
| }, |
| }, |
| messageEvents: []trace.MessageEvent{ |
| { |
| EventType: trace.MessageEventTypeRecv, |
| MessageID: 1, // First msg recv so 1 |
| }, |
| { |
| EventType: trace.MessageEventTypeRecv, |
| MessageID: 2, // Second msg recv so 2 |
| }, |
| }, |
| hasRemoteParent: true, |
| }, |
| { |
| sc: trace.SpanContext{ |
| TraceOptions: 1, |
| }, |
| spanKind: trace.SpanKindClient, |
| name: "Sent.grpc.testing.TestService.FullDuplexCall", |
| hasRemoteParent: false, |
| childSpanCount: 1, |
| }, |
| { |
| sc: trace.SpanContext{ |
| TraceOptions: 1, |
| }, |
| spanKind: trace.SpanKindClient, |
| name: "Attempt.grpc.testing.TestService.FullDuplexCall", |
| messageEvents: []trace.MessageEvent{ |
| { |
| EventType: trace.MessageEventTypeSent, |
| MessageID: 1, // First msg send so 1 |
| }, |
| { |
| EventType: trace.MessageEventTypeSent, |
| MessageID: 2, // Second msg send so 2 |
| }, |
| }, |
| hasRemoteParent: false, |
| }, |
| } |
| fe.mu.Lock() |
| defer fe.mu.Unlock() |
| if diff := cmp.Diff(fe.seenSpans, wantSI); diff != "" { |
| t.Fatalf("got unexpected spans, diff (-got, +want): %v", diff) |
| } |
| if err := validateTraceAndSpanIDs(fe.seenSpans); err != nil { |
| t.Fatalf("Error in runtime data assertions: %v", err) |
| } |
| if !cmp.Equal(fe.seenSpans[0].parentSpanID, fe.seenSpans[2].sc.SpanID) { |
| t.Fatalf("server span should point to the client attempt span as its parent. parentSpanID: %v, clientAttemptSpanID: %v", fe.seenSpans[0].parentSpanID, fe.seenSpans[2].sc.SpanID) |
| } |
| if !cmp.Equal(fe.seenSpans[2].parentSpanID, fe.seenSpans[1].sc.SpanID) { |
| t.Fatalf("client attempt span should point to the client call span as its parent. parentSpanID: %v, clientCallSpanID: %v", fe.seenSpans[2].parentSpanID, fe.seenSpans[1].sc.SpanID) |
| } |
| } |