| /* |
| * |
| * Copyright 2022 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package observability |
| |
| import ( |
| "context" |
| "encoding/base64" |
| "encoding/json" |
| "fmt" |
| "io" |
| "strings" |
| "sync" |
| "testing" |
| |
| gcplogging "cloud.google.com/go/logging" |
| "github.com/google/go-cmp/cmp" |
| "github.com/google/go-cmp/cmp/cmpopts" |
| "google.golang.org/grpc/internal/envconfig" |
| "google.golang.org/grpc/internal/stubserver" |
| "google.golang.org/grpc/metadata" |
| |
| binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" |
| testgrpc "google.golang.org/grpc/interop/grpc_testing" |
| testpb "google.golang.org/grpc/interop/grpc_testing" |
| ) |
| |
| func cmpLoggingEntryList(got []*grpcLogEntry, want []*grpcLogEntry) error { |
| if diff := cmp.Diff(got, want, |
| // For nondeterministic metadata iteration. |
| cmp.Comparer(func(a map[string]string, b map[string]string) bool { |
| if len(a) > len(b) { |
| a, b = b, a |
| } |
| if len(a) == 0 && len(a) != len(b) { // No metadata for one and the other comparator wants metadata. |
| return false |
| } |
| for k, v := range a { |
| if b[k] != v { |
| return false |
| } |
| } |
| return true |
| }), |
| cmpopts.IgnoreFields(grpcLogEntry{}, "CallID", "Peer"), |
| cmpopts.IgnoreFields(address{}, "IPPort", "Type"), |
| cmpopts.IgnoreFields(payload{}, "Timeout")); diff != "" { |
| return fmt.Errorf("got unexpected grpcLogEntry list, diff (-got, +want): %v", diff) |
| } |
| return nil |
| } |
| |
| type fakeLoggingExporter struct { |
| t *testing.T |
| |
| mu sync.Mutex |
| entries []*grpcLogEntry |
| |
| idsSeen []*traceAndSpanIDString |
| } |
| |
| func (fle *fakeLoggingExporter) EmitGcpLoggingEntry(entry gcplogging.Entry) { |
| fle.mu.Lock() |
| defer fle.mu.Unlock() |
| if entry.Severity != 100 { |
| fle.t.Errorf("entry.Severity is not 100, this should be hardcoded") |
| } |
| |
| ids := &traceAndSpanIDString{ |
| traceID: entry.Trace, |
| spanID: entry.SpanID, |
| isSampled: entry.TraceSampled, |
| } |
| fle.idsSeen = append(fle.idsSeen, ids) |
| |
| grpcLogEntry, ok := entry.Payload.(*grpcLogEntry) |
| if !ok { |
| fle.t.Errorf("payload passed in isn't grpcLogEntry") |
| } |
| fle.entries = append(fle.entries, grpcLogEntry) |
| } |
| |
| func (fle *fakeLoggingExporter) Close() error { |
| return nil |
| } |
| |
| // setupObservabilitySystemWithConfig sets up the observability system with the |
| // specified config, and returns a function which cleans up the observability |
| // system. |
| func setupObservabilitySystemWithConfig(cfg *config) (func(), error) { |
| validConfigJSON, err := json.Marshal(cfg) |
| if err != nil { |
| return nil, fmt.Errorf("failed to convert config to JSON: %v", err) |
| } |
| oldObservabilityConfig := envconfig.ObservabilityConfig |
| envconfig.ObservabilityConfig = string(validConfigJSON) |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| err = Start(ctx) |
| cleanup := func() { |
| End() |
| envconfig.ObservabilityConfig = oldObservabilityConfig |
| } |
| if err != nil { |
| return cleanup, fmt.Errorf("error in Start: %v", err) |
| } |
| return cleanup, nil |
| } |
| |
| // TestClientRPCEventsLogAll tests the observability system configured with a |
| // client RPC event that logs every call. It performs a Unary and Bidirectional |
| // Streaming RPC, and expects certain grpcLogEntries to make it's way to the |
| // exporter. |
| func (s) TestClientRPCEventsLogAll(t *testing.T) { |
| fle := &fakeLoggingExporter{ |
| t: t, |
| } |
| defer func(ne func(ctx context.Context, config *config) (loggingExporter, error)) { |
| newLoggingExporter = ne |
| }(newLoggingExporter) |
| |
| newLoggingExporter = func(ctx context.Context, config *config) (loggingExporter, error) { |
| return fle, nil |
| } |
| |
| clientRPCEventLogAllConfig := &config{ |
| ProjectID: "fake", |
| CloudLogging: &cloudLogging{ |
| ClientRPCEvents: []clientRPCEvents{ |
| { |
| Methods: []string{"*"}, |
| MaxMetadataBytes: 30, |
| MaxMessageBytes: 30, |
| }, |
| }, |
| }, |
| } |
| cleanup, err := setupObservabilitySystemWithConfig(clientRPCEventLogAllConfig) |
| if err != nil { |
| t.Fatalf("error setting up observability: %v", err) |
| } |
| defer cleanup() |
| |
| ss := &stubserver.StubServer{ |
| UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { |
| return &testpb.SimpleResponse{}, nil |
| }, |
| FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { |
| if _, err := stream.Recv(); err != nil { |
| return err |
| } |
| if err := stream.Send(&testpb.StreamingOutputCallResponse{}); err != nil { |
| return err |
| } |
| if _, err := stream.Recv(); err != io.EOF { |
| return err |
| } |
| return nil |
| }, |
| } |
| if err := ss.Start(nil); err != nil { |
| t.Fatalf("Error starting endpoint server: %v", err) |
| } |
| defer ss.Stop() |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { |
| t.Fatalf("Unexpected error from UnaryCall: %v", err) |
| } |
| |
| grpcLogEntriesWant := []*grpcLogEntry{ |
| { |
| Type: eventTypeClientHeader, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| Authority: ss.Address, |
| SequenceID: 1, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| }, |
| }, |
| { |
| Type: eventTypeClientMessage, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| SequenceID: 2, |
| Authority: ss.Address, |
| Payload: payload{ |
| Message: []uint8{}, |
| }, |
| }, |
| { |
| Type: eventTypeServerHeader, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| SequenceID: 3, |
| Authority: ss.Address, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| }, |
| }, |
| { |
| Type: eventTypeServerMessage, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| Authority: ss.Address, |
| SequenceID: 4, |
| }, |
| { |
| Type: eventTypeServerTrailer, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| SequenceID: 5, |
| Authority: ss.Address, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| StatusCode: "OK", |
| }, |
| }, |
| } |
| fle.mu.Lock() |
| if err := cmpLoggingEntryList(fle.entries, grpcLogEntriesWant); err != nil { |
| fle.mu.Unlock() |
| t.Fatalf("error in logging entry list comparison %v", err) |
| } |
| |
| fle.entries = nil |
| fle.mu.Unlock() |
| |
| // Make a streaming RPC. This should cause Log calls on the MethodLogger. |
| stream, err := ss.Client.FullDuplexCall(ctx) |
| if err != nil { |
| t.Fatalf("ss.Client.FullDuplexCall failed: %f", err) |
| } |
| if err := stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil { |
| t.Fatalf("stream.Send() failed: %v", err) |
| } |
| if _, err := stream.Recv(); err != nil { |
| t.Fatalf("stream.Recv() failed: %v", err) |
| } |
| if err := stream.CloseSend(); err != nil { |
| t.Fatalf("stream.CloseSend()() failed: %v", err) |
| } |
| if _, err = stream.Recv(); err != io.EOF { |
| t.Fatalf("unexpected error: %v, expected an EOF error", err) |
| } |
| grpcLogEntriesWant = []*grpcLogEntry{ |
| { |
| Type: eventTypeClientHeader, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "FullDuplexCall", |
| Authority: ss.Address, |
| SequenceID: 1, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| }, |
| }, |
| { |
| Type: eventTypeClientMessage, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "FullDuplexCall", |
| SequenceID: 2, |
| Authority: ss.Address, |
| Payload: payload{ |
| Message: []uint8{}, |
| }, |
| }, |
| { |
| Type: eventTypeServerHeader, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "FullDuplexCall", |
| SequenceID: 3, |
| Authority: ss.Address, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| }, |
| }, |
| { |
| Type: eventTypeServerMessage, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "FullDuplexCall", |
| SequenceID: 4, |
| Authority: ss.Address, |
| }, |
| { |
| Type: eventTypeClientHalfClose, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "FullDuplexCall", |
| SequenceID: 5, |
| Authority: ss.Address, |
| }, |
| { |
| Type: eventTypeServerTrailer, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "FullDuplexCall", |
| Authority: ss.Address, |
| SequenceID: 6, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| StatusCode: "OK", |
| }, |
| }, |
| } |
| fle.mu.Lock() |
| if err := cmpLoggingEntryList(fle.entries, grpcLogEntriesWant); err != nil { |
| fle.mu.Unlock() |
| t.Fatalf("error in logging entry list comparison %v", err) |
| } |
| fle.mu.Unlock() |
| } |
| |
| func (s) TestServerRPCEventsLogAll(t *testing.T) { |
| fle := &fakeLoggingExporter{ |
| t: t, |
| } |
| defer func(ne func(ctx context.Context, config *config) (loggingExporter, error)) { |
| newLoggingExporter = ne |
| }(newLoggingExporter) |
| |
| newLoggingExporter = func(ctx context.Context, config *config) (loggingExporter, error) { |
| return fle, nil |
| } |
| |
| serverRPCEventLogAllConfig := &config{ |
| ProjectID: "fake", |
| CloudLogging: &cloudLogging{ |
| ServerRPCEvents: []serverRPCEvents{ |
| { |
| Methods: []string{"*"}, |
| MaxMetadataBytes: 30, |
| MaxMessageBytes: 30, |
| }, |
| }, |
| }, |
| } |
| cleanup, err := setupObservabilitySystemWithConfig(serverRPCEventLogAllConfig) |
| if err != nil { |
| t.Fatalf("error setting up observability %v", err) |
| } |
| defer cleanup() |
| |
| ss := &stubserver.StubServer{ |
| UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { |
| return &testpb.SimpleResponse{}, nil |
| }, |
| FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { |
| if _, err := stream.Recv(); err != nil { |
| return err |
| } |
| if err := stream.Send(&testpb.StreamingOutputCallResponse{}); err != nil { |
| return err |
| } |
| if _, err := stream.Recv(); err != io.EOF { |
| return err |
| } |
| return nil |
| }, |
| } |
| if err := ss.Start(nil); err != nil { |
| t.Fatalf("Error starting endpoint server: %v", err) |
| } |
| defer ss.Stop() |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { |
| t.Fatalf("Unexpected error from UnaryCall: %v", err) |
| } |
| grpcLogEntriesWant := []*grpcLogEntry{ |
| { |
| Type: eventTypeClientHeader, |
| Logger: loggerServer, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| Authority: ss.Address, |
| SequenceID: 1, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| }, |
| }, |
| { |
| Type: eventTypeClientMessage, |
| Logger: loggerServer, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| SequenceID: 2, |
| Authority: ss.Address, |
| }, |
| { |
| Type: eventTypeServerHeader, |
| Logger: loggerServer, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| SequenceID: 3, |
| Authority: ss.Address, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| }, |
| }, |
| { |
| Type: eventTypeServerMessage, |
| Logger: loggerServer, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| Authority: ss.Address, |
| SequenceID: 4, |
| Payload: payload{ |
| Message: []uint8{}, |
| }, |
| }, |
| { |
| Type: eventTypeServerTrailer, |
| Logger: loggerServer, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| SequenceID: 5, |
| Authority: ss.Address, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| StatusCode: "OK", |
| }, |
| }, |
| } |
| fle.mu.Lock() |
| if err := cmpLoggingEntryList(fle.entries, grpcLogEntriesWant); err != nil { |
| fle.mu.Unlock() |
| t.Fatalf("error in logging entry list comparison %v", err) |
| } |
| fle.entries = nil |
| fle.mu.Unlock() |
| |
| stream, err := ss.Client.FullDuplexCall(ctx) |
| if err != nil { |
| t.Fatalf("ss.Client.FullDuplexCall failed: %f", err) |
| } |
| if err := stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil { |
| t.Fatalf("stream.Send() failed: %v", err) |
| } |
| if _, err := stream.Recv(); err != nil { |
| t.Fatalf("stream.Recv() failed: %v", err) |
| } |
| if err := stream.CloseSend(); err != nil { |
| t.Fatalf("stream.CloseSend()() failed: %v", err) |
| } |
| if _, err = stream.Recv(); err != io.EOF { |
| t.Fatalf("unexpected error: %v, expected an EOF error", err) |
| } |
| |
| grpcLogEntriesWant = []*grpcLogEntry{ |
| { |
| Type: eventTypeClientHeader, |
| Logger: loggerServer, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "FullDuplexCall", |
| Authority: ss.Address, |
| SequenceID: 1, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| }, |
| }, |
| { |
| Type: eventTypeClientMessage, |
| Logger: loggerServer, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "FullDuplexCall", |
| SequenceID: 2, |
| Authority: ss.Address, |
| }, |
| { |
| Type: eventTypeServerHeader, |
| Logger: loggerServer, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "FullDuplexCall", |
| SequenceID: 3, |
| Authority: ss.Address, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| }, |
| }, |
| { |
| Type: eventTypeServerMessage, |
| Logger: loggerServer, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "FullDuplexCall", |
| SequenceID: 4, |
| Authority: ss.Address, |
| Payload: payload{ |
| Message: []uint8{}, |
| }, |
| }, |
| { |
| Type: eventTypeClientHalfClose, |
| Logger: loggerServer, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "FullDuplexCall", |
| SequenceID: 5, |
| Authority: ss.Address, |
| }, |
| { |
| Type: eventTypeServerTrailer, |
| Logger: loggerServer, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "FullDuplexCall", |
| Authority: ss.Address, |
| SequenceID: 6, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| StatusCode: "OK", |
| }, |
| }, |
| } |
| fle.mu.Lock() |
| if err := cmpLoggingEntryList(fle.entries, grpcLogEntriesWant); err != nil { |
| fle.mu.Unlock() |
| t.Fatalf("error in logging entry list comparison %v", err) |
| } |
| fle.mu.Unlock() |
| } |
| |
| // TestBothClientAndServerRPCEvents tests the scenario where you have both |
| // Client and Server RPC Events configured to log. Both sides should log and |
| // share the exporter, so the exporter should receive the collective amount of |
| // calls for both a client stream (corresponding to a Client RPC Event) and a |
| // server stream (corresponding ot a Server RPC Event). The specificity of the |
| // entries are tested in previous tests. |
| func (s) TestBothClientAndServerRPCEvents(t *testing.T) { |
| fle := &fakeLoggingExporter{ |
| t: t, |
| } |
| defer func(ne func(ctx context.Context, config *config) (loggingExporter, error)) { |
| newLoggingExporter = ne |
| }(newLoggingExporter) |
| |
| newLoggingExporter = func(ctx context.Context, config *config) (loggingExporter, error) { |
| return fle, nil |
| } |
| |
| serverRPCEventLogAllConfig := &config{ |
| ProjectID: "fake", |
| CloudLogging: &cloudLogging{ |
| ClientRPCEvents: []clientRPCEvents{ |
| { |
| Methods: []string{"*"}, |
| MaxMetadataBytes: 30, |
| MaxMessageBytes: 30, |
| }, |
| }, |
| ServerRPCEvents: []serverRPCEvents{ |
| { |
| Methods: []string{"*"}, |
| MaxMetadataBytes: 30, |
| MaxMessageBytes: 30, |
| }, |
| }, |
| }, |
| } |
| |
| cleanup, err := setupObservabilitySystemWithConfig(serverRPCEventLogAllConfig) |
| if err != nil { |
| t.Fatalf("error setting up observability %v", err) |
| } |
| defer cleanup() |
| |
| ss := &stubserver.StubServer{ |
| UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { |
| return &testpb.SimpleResponse{}, nil |
| }, |
| FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { |
| _, err := stream.Recv() |
| if err != io.EOF { |
| return err |
| } |
| return nil |
| }, |
| } |
| if err := ss.Start(nil); err != nil { |
| t.Fatalf("Error starting endpoint server: %v", err) |
| } |
| defer ss.Stop() |
| |
| // Make a Unary RPC. Both client side and server side streams should log |
| // entries, which share the same exporter. The exporter should thus receive |
| // entries from both the client and server streams (the specificity of |
| // entries is checked in previous tests). |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { |
| t.Fatalf("Unexpected error from UnaryCall: %v", err) |
| } |
| fle.mu.Lock() |
| if len(fle.entries) != 10 { |
| fle.mu.Unlock() |
| t.Fatalf("Unexpected length of entries %v, want 10 (collective of client and server)", len(fle.entries)) |
| } |
| fle.mu.Unlock() |
| stream, err := ss.Client.FullDuplexCall(ctx) |
| if err != nil { |
| t.Fatalf("ss.Client.FullDuplexCall failed: %f", err) |
| } |
| |
| stream.CloseSend() |
| if _, err = stream.Recv(); err != io.EOF { |
| t.Fatalf("unexpected error: %v, expected an EOF error", err) |
| } |
| fle.mu.Lock() |
| if len(fle.entries) != 16 { |
| fle.mu.Unlock() |
| t.Fatalf("Unexpected length of entries %v, want 16 (collective of client and server)", len(fle.entries)) |
| } |
| fle.mu.Unlock() |
| } |
| |
| // TestClientRPCEventsLogAll tests the observability system configured with a |
| // client RPC event that logs every call and that truncates headers and |
| // messages. It performs a Unary RPC, and expects events with truncated payloads |
| // and payloadTruncated set to true, signifying the system properly truncated |
| // headers and messages logged. |
| func (s) TestClientRPCEventsTruncateHeaderAndMetadata(t *testing.T) { |
| fle := &fakeLoggingExporter{ |
| t: t, |
| } |
| defer func(ne func(ctx context.Context, config *config) (loggingExporter, error)) { |
| newLoggingExporter = ne |
| }(newLoggingExporter) |
| |
| newLoggingExporter = func(ctx context.Context, config *config) (loggingExporter, error) { |
| return fle, nil |
| } |
| |
| clientRPCEventLogAllConfig := &config{ |
| ProjectID: "fake", |
| CloudLogging: &cloudLogging{ |
| ClientRPCEvents: []clientRPCEvents{ |
| { |
| Methods: []string{"*"}, |
| MaxMetadataBytes: 10, |
| MaxMessageBytes: 2, |
| }, |
| }, |
| }, |
| } |
| cleanup, err := setupObservabilitySystemWithConfig(clientRPCEventLogAllConfig) |
| if err != nil { |
| t.Fatalf("error setting up observability: %v", err) |
| } |
| defer cleanup() |
| |
| ss := &stubserver.StubServer{ |
| UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { |
| return &testpb.SimpleResponse{}, nil |
| }, |
| } |
| if err := ss.Start(nil); err != nil { |
| t.Fatalf("Error starting endpoint server: %v", err) |
| } |
| defer ss.Stop() |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| |
| md := metadata.MD{ |
| "key1": []string{"value1"}, |
| "key2": []string{"value2"}, |
| } |
| ctx = metadata.NewOutgoingContext(ctx, md) |
| if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{Body: []byte("00000")}}); err != nil { |
| t.Fatalf("Unexpected error from UnaryCall: %v", err) |
| } |
| grpcLogEntriesWant := []*grpcLogEntry{ |
| { |
| Type: eventTypeClientHeader, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| Authority: ss.Address, |
| SequenceID: 1, |
| Payload: payload{ |
| Metadata: map[string]string{ |
| "key1": "value1", |
| "key2": "value2", |
| }, |
| }, |
| PayloadTruncated: true, |
| }, |
| { |
| Type: eventTypeClientMessage, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| SequenceID: 2, |
| Authority: ss.Address, |
| Payload: payload{ |
| MessageLength: 9, |
| Message: []uint8{ |
| 0x1a, |
| 0x07, |
| }, |
| }, |
| PayloadTruncated: true, |
| }, |
| { |
| Type: eventTypeServerHeader, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| SequenceID: 3, |
| Authority: ss.Address, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| }, |
| }, |
| { |
| Type: eventTypeServerMessage, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| Authority: ss.Address, |
| SequenceID: 4, |
| }, |
| { |
| Type: eventTypeServerTrailer, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| SequenceID: 5, |
| Authority: ss.Address, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| StatusCode: "OK", |
| }, |
| }, |
| } |
| fle.mu.Lock() |
| if err := cmpLoggingEntryList(fle.entries, grpcLogEntriesWant); err != nil { |
| fle.mu.Unlock() |
| t.Fatalf("error in logging entry list comparison %v", err) |
| } |
| // Only one metadata entry should have been present in logging due to |
| // truncation. |
| if mdLen := len(fle.entries[0].Payload.Metadata); mdLen != 1 { |
| t.Fatalf("Metadata should have only 1 entry due to truncation, got %v", mdLen) |
| } |
| fle.mu.Unlock() |
| } |
| |
| // TestPrecedenceOrderingInConfiguration tests the scenario where the logging |
| // part of observability is configured with three client RPC events, the first |
| // two on specific methods in the service, the last one for any method within |
| // the service. This test sends three RPC's, one corresponding to each log |
| // entry. The logging logic dictated by that specific event should be what is |
| // used for emission. The second event will specify to exclude logging on RPC's, |
| // which should generate no log entries if an RPC gets to and matches that |
| // event. |
| func (s) TestPrecedenceOrderingInConfiguration(t *testing.T) { |
| fle := &fakeLoggingExporter{ |
| t: t, |
| } |
| |
| defer func(ne func(ctx context.Context, config *config) (loggingExporter, error)) { |
| newLoggingExporter = ne |
| }(newLoggingExporter) |
| |
| newLoggingExporter = func(ctx context.Context, config *config) (loggingExporter, error) { |
| return fle, nil |
| } |
| |
| threeEventsConfig := &config{ |
| ProjectID: "fake", |
| CloudLogging: &cloudLogging{ |
| ClientRPCEvents: []clientRPCEvents{ |
| { |
| Methods: []string{"grpc.testing.TestService/UnaryCall"}, |
| MaxMetadataBytes: 30, |
| MaxMessageBytes: 30, |
| }, |
| { |
| Methods: []string{"grpc.testing.TestService/EmptyCall"}, |
| Exclude: true, |
| MaxMetadataBytes: 30, |
| MaxMessageBytes: 30, |
| }, |
| { |
| Methods: []string{"grpc.testing.TestService/*"}, |
| MaxMetadataBytes: 30, |
| MaxMessageBytes: 30, |
| }, |
| }, |
| }, |
| } |
| |
| cleanup, err := setupObservabilitySystemWithConfig(threeEventsConfig) |
| if err != nil { |
| t.Fatalf("error setting up observability %v", err) |
| } |
| defer cleanup() |
| |
| ss := &stubserver.StubServer{ |
| EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { |
| return &testpb.Empty{}, nil |
| }, |
| UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { |
| return &testpb.SimpleResponse{}, nil |
| }, |
| FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { |
| _, err := stream.Recv() |
| if err != io.EOF { |
| return err |
| } |
| return nil |
| }, |
| } |
| if err := ss.Start(nil); err != nil { |
| t.Fatalf("Error starting endpoint server: %v", err) |
| } |
| defer ss.Stop() |
| |
| // A Unary RPC should match with first event and logs should correspond |
| // accordingly. The first event it matches to should be used for the |
| // configuration, even though it could potentially match to events in the |
| // future. |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { |
| t.Fatalf("Unexpected error from UnaryCall: %v", err) |
| } |
| grpcLogEntriesWant := []*grpcLogEntry{ |
| { |
| Type: eventTypeClientHeader, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| Authority: ss.Address, |
| SequenceID: 1, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| }, |
| }, |
| { |
| Type: eventTypeClientMessage, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| SequenceID: 2, |
| Authority: ss.Address, |
| Payload: payload{ |
| Message: []uint8{}, |
| }, |
| }, |
| { |
| Type: eventTypeServerHeader, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| SequenceID: 3, |
| Authority: ss.Address, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| }, |
| }, |
| { |
| Type: eventTypeServerMessage, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| Authority: ss.Address, |
| SequenceID: 4, |
| }, |
| { |
| Type: eventTypeServerTrailer, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| SequenceID: 5, |
| Authority: ss.Address, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| StatusCode: "OK", |
| }, |
| }, |
| } |
| |
| fle.mu.Lock() |
| if err := cmpLoggingEntryList(fle.entries, grpcLogEntriesWant); err != nil { |
| fle.mu.Unlock() |
| t.Fatalf("error in logging entry list comparison %v", err) |
| } |
| fle.entries = nil |
| fle.mu.Unlock() |
| |
| // A unary empty RPC should match with the second event, which has the exclude |
| // flag set. Thus, a unary empty RPC should cause no downstream logs. |
| if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil { |
| t.Fatalf("Unexpected error from EmptyCall: %v", err) |
| } |
| // The exporter should have received no new log entries due to this call. |
| fle.mu.Lock() |
| if len(fle.entries) != 0 { |
| fle.mu.Unlock() |
| t.Fatalf("Unexpected length of entries %v, want 0", len(fle.entries)) |
| } |
| fle.mu.Unlock() |
| |
| // A third RPC, a full duplex call, which doesn't match with first two and |
| // matches to last one, due to being a wildcard for every method in the |
| // service, should log accordingly to the last event's logic. |
| stream, err := ss.Client.FullDuplexCall(ctx) |
| if err != nil { |
| t.Fatalf("ss.Client.FullDuplexCall failed: %f", err) |
| } |
| |
| stream.CloseSend() |
| if _, err = stream.Recv(); err != io.EOF { |
| t.Fatalf("unexpected error: %v, expected an EOF error", err) |
| } |
| |
| grpcLogEntriesWant = []*grpcLogEntry{ |
| { |
| Type: eventTypeClientHeader, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "FullDuplexCall", |
| Authority: ss.Address, |
| SequenceID: 1, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| }, |
| }, |
| { |
| Type: eventTypeClientHalfClose, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "FullDuplexCall", |
| SequenceID: 2, |
| Authority: ss.Address, |
| }, |
| { |
| Type: eventTypeServerTrailer, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "FullDuplexCall", |
| Authority: ss.Address, |
| SequenceID: 3, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| StatusCode: "OK", |
| }, |
| }, |
| } |
| fle.mu.Lock() |
| if err := cmpLoggingEntryList(fle.entries, grpcLogEntriesWant); err != nil { |
| fle.mu.Unlock() |
| t.Fatalf("error in logging entry list comparison %v", err) |
| } |
| fle.mu.Unlock() |
| } |
| |
| func (s) TestTranslateMetadata(t *testing.T) { |
| concatBinLogValue := base64.StdEncoding.EncodeToString([]byte("value1")) + "," + base64.StdEncoding.EncodeToString([]byte("value2")) |
| tests := []struct { |
| name string |
| binLogMD *binlogpb.Metadata |
| wantMD map[string]string |
| }{ |
| { |
| name: "two-entries-different-key", |
| binLogMD: &binlogpb.Metadata{ |
| Entry: []*binlogpb.MetadataEntry{ |
| { |
| Key: "header1", |
| Value: []byte("value1"), |
| }, |
| { |
| Key: "header2", |
| Value: []byte("value2"), |
| }, |
| }, |
| }, |
| wantMD: map[string]string{ |
| "header1": "value1", |
| "header2": "value2", |
| }, |
| }, |
| { |
| name: "two-entries-same-key", |
| binLogMD: &binlogpb.Metadata{ |
| Entry: []*binlogpb.MetadataEntry{ |
| { |
| Key: "header1", |
| Value: []byte("value1"), |
| }, |
| { |
| Key: "header1", |
| Value: []byte("value2"), |
| }, |
| }, |
| }, |
| wantMD: map[string]string{ |
| "header1": "value1,value2", |
| }, |
| }, |
| { |
| name: "two-entries-same-key-bin-header", |
| binLogMD: &binlogpb.Metadata{ |
| Entry: []*binlogpb.MetadataEntry{ |
| { |
| Key: "header1-bin", |
| Value: []byte("value1"), |
| }, |
| { |
| Key: "header1-bin", |
| Value: []byte("value2"), |
| }, |
| }, |
| }, |
| wantMD: map[string]string{ |
| "header1-bin": concatBinLogValue, |
| }, |
| }, |
| { |
| name: "four-entries-two-keys", |
| binLogMD: &binlogpb.Metadata{ |
| Entry: []*binlogpb.MetadataEntry{ |
| { |
| Key: "header1", |
| Value: []byte("value1"), |
| }, |
| { |
| Key: "header1", |
| Value: []byte("value2"), |
| }, |
| { |
| Key: "header1-bin", |
| Value: []byte("value1"), |
| }, |
| { |
| Key: "header1-bin", |
| Value: []byte("value2"), |
| }, |
| }, |
| }, |
| wantMD: map[string]string{ |
| "header1": "value1,value2", |
| "header1-bin": concatBinLogValue, |
| }, |
| }, |
| } |
| |
| for _, test := range tests { |
| t.Run(test.name, func(t *testing.T) { |
| if gotMD := translateMetadata(test.binLogMD); !cmp.Equal(gotMD, test.wantMD) { |
| t.Fatalf("translateMetadata(%v) = %v, want %v", test.binLogMD, gotMD, test.wantMD) |
| } |
| }) |
| } |
| } |
| |
| func (s) TestMarshalJSON(t *testing.T) { |
| logEntry := &grpcLogEntry{ |
| CallID: "300-300-300", |
| SequenceID: 3, |
| Type: eventTypeUnknown, |
| Logger: loggerClient, |
| Payload: payload{ |
| Metadata: map[string]string{"header1": "value1"}, |
| Timeout: 20, |
| StatusCode: "UNKNOWN", |
| StatusMessage: "ok", |
| StatusDetails: []byte("ok"), |
| MessageLength: 3, |
| Message: []byte("wow"), |
| }, |
| Peer: address{ |
| Type: ipv4, |
| Address: "localhost", |
| IPPort: 16000, |
| }, |
| PayloadTruncated: false, |
| Authority: "server", |
| ServiceName: "grpc-testing", |
| MethodName: "UnaryRPC", |
| } |
| if _, err := json.Marshal(logEntry); err != nil { |
| t.Fatalf("json.Marshal(%v) failed with error: %v", logEntry, err) |
| } |
| } |
| |
| // TestMetadataTruncationAccountsKey tests that the metadata truncation takes |
| // into account both the key and value of metadata. It configures an |
| // observability system with a maximum byte length for metadata, which is |
| // greater than just the byte length of the metadata value but less than the |
| // byte length of the metadata key + metadata value. Thus, in the ClientHeader |
| // logging event, no metadata should be logged. |
| func (s) TestMetadataTruncationAccountsKey(t *testing.T) { |
| fle := &fakeLoggingExporter{ |
| t: t, |
| } |
| defer func(ne func(ctx context.Context, config *config) (loggingExporter, error)) { |
| newLoggingExporter = ne |
| }(newLoggingExporter) |
| |
| newLoggingExporter = func(ctx context.Context, config *config) (loggingExporter, error) { |
| return fle, nil |
| } |
| |
| const mdValue = "value" |
| configMetadataLimit := &config{ |
| ProjectID: "fake", |
| CloudLogging: &cloudLogging{ |
| ClientRPCEvents: []clientRPCEvents{ |
| { |
| Methods: []string{"*"}, |
| MaxMetadataBytes: len(mdValue) + 1, |
| }, |
| }, |
| }, |
| } |
| |
| cleanup, err := setupObservabilitySystemWithConfig(configMetadataLimit) |
| if err != nil { |
| t.Fatalf("error setting up observability %v", err) |
| } |
| defer cleanup() |
| |
| ss := &stubserver.StubServer{ |
| UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { |
| return &testpb.SimpleResponse{}, nil |
| }, |
| } |
| if err := ss.Start(nil); err != nil { |
| t.Fatalf("Error starting endpoint server: %v", err) |
| } |
| defer ss.Stop() |
| |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| |
| // the set config MaxMetdataBytes is in between len(mdValue) and len("key") |
| // + len(mdValue), and thus shouldn't log this metadata entry. |
| md := metadata.MD{ |
| "key": []string{mdValue}, |
| } |
| ctx = metadata.NewOutgoingContext(ctx, md) |
| if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: &testpb.Payload{Body: []byte("00000")}}); err != nil { |
| t.Fatalf("Unexpected error from UnaryCall: %v", err) |
| } |
| |
| grpcLogEntriesWant := []*grpcLogEntry{ |
| { |
| Type: eventTypeClientHeader, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| Authority: ss.Address, |
| SequenceID: 1, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| }, |
| PayloadTruncated: true, |
| }, |
| { |
| Type: eventTypeClientMessage, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| SequenceID: 2, |
| Authority: ss.Address, |
| Payload: payload{ |
| MessageLength: 9, |
| Message: []uint8{}, |
| }, |
| PayloadTruncated: true, |
| }, |
| { |
| Type: eventTypeServerHeader, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| SequenceID: 3, |
| Authority: ss.Address, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| }, |
| }, |
| { |
| Type: eventTypeServerMessage, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| Authority: ss.Address, |
| SequenceID: 4, |
| }, |
| { |
| Type: eventTypeServerTrailer, |
| Logger: loggerClient, |
| ServiceName: "grpc.testing.TestService", |
| MethodName: "UnaryCall", |
| SequenceID: 5, |
| Authority: ss.Address, |
| Payload: payload{ |
| Metadata: map[string]string{}, |
| StatusCode: "OK", |
| }, |
| }, |
| } |
| fle.mu.Lock() |
| if err := cmpLoggingEntryList(fle.entries, grpcLogEntriesWant); err != nil { |
| fle.mu.Unlock() |
| t.Fatalf("error in logging entry list comparison %v", err) |
| } |
| fle.mu.Unlock() |
| } |
| |
| // TestMethodInConfiguration tests different method names with an expectation on |
| // whether they should error or not. |
| func (s) TestMethodInConfiguration(t *testing.T) { |
| // To skip creating a stackdriver exporter. |
| fle := &fakeLoggingExporter{ |
| t: t, |
| } |
| |
| defer func(ne func(ctx context.Context, config *config) (loggingExporter, error)) { |
| newLoggingExporter = ne |
| }(newLoggingExporter) |
| |
| newLoggingExporter = func(ctx context.Context, config *config) (loggingExporter, error) { |
| return fle, nil |
| } |
| |
| tests := []struct { |
| name string |
| config *config |
| wantErr string |
| }{ |
| { |
| name: "leading-slash", |
| config: &config{ |
| ProjectID: "fake", |
| CloudLogging: &cloudLogging{ |
| ClientRPCEvents: []clientRPCEvents{ |
| { |
| Methods: []string{"/service/method"}, |
| }, |
| }, |
| }, |
| }, |
| wantErr: "cannot have a leading slash", |
| }, |
| { |
| name: "wildcard service/method", |
| config: &config{ |
| ProjectID: "fake", |
| CloudLogging: &cloudLogging{ |
| ClientRPCEvents: []clientRPCEvents{ |
| { |
| Methods: []string{"*/method"}, |
| }, |
| }, |
| }, |
| }, |
| wantErr: "cannot have service wildcard *", |
| }, |
| { |
| name: "/ in service name", |
| config: &config{ |
| ProjectID: "fake", |
| CloudLogging: &cloudLogging{ |
| ClientRPCEvents: []clientRPCEvents{ |
| { |
| Methods: []string{"ser/vice/method"}, |
| }, |
| }, |
| }, |
| }, |
| wantErr: "only one /", |
| }, |
| { |
| name: "empty method name", |
| config: &config{ |
| ProjectID: "fake", |
| CloudLogging: &cloudLogging{ |
| ClientRPCEvents: []clientRPCEvents{ |
| { |
| Methods: []string{"service/"}, |
| }, |
| }, |
| }, |
| }, |
| wantErr: "method name must be non empty", |
| }, |
| { |
| name: "normal", |
| config: &config{ |
| ProjectID: "fake", |
| CloudLogging: &cloudLogging{ |
| ClientRPCEvents: []clientRPCEvents{ |
| { |
| Methods: []string{"service/method"}, |
| }, |
| }, |
| }, |
| }, |
| wantErr: "", |
| }, |
| } |
| for _, test := range tests { |
| t.Run(test.name, func(t *testing.T) { |
| cleanup, gotErr := setupObservabilitySystemWithConfig(test.config) |
| if cleanup != nil { |
| defer cleanup() |
| } |
| if gotErr != nil && !strings.Contains(gotErr.Error(), test.wantErr) { |
| t.Fatalf("Start(%v) = %v, wantErr %v", test.config, gotErr, test.wantErr) |
| } |
| if (gotErr != nil) != (test.wantErr != "") { |
| t.Fatalf("Start(%v) = %v, wantErr %v", test.config, gotErr, test.wantErr) |
| } |
| }) |
| } |
| } |