| /* |
| * |
| * Copyright 2014 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package grpc |
| |
| import ( |
| "bytes" |
| "compress/gzip" |
| "context" |
| "errors" |
| "io" |
| "math" |
| "reflect" |
| "sync" |
| "testing" |
| |
| "github.com/google/go-cmp/cmp" |
| "github.com/google/go-cmp/cmp/cmpopts" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/encoding" |
| _ "google.golang.org/grpc/encoding/gzip" |
| protoenc "google.golang.org/grpc/encoding/proto" |
| "google.golang.org/grpc/internal/testutils" |
| "google.golang.org/grpc/internal/transport" |
| "google.golang.org/grpc/mem" |
| "google.golang.org/grpc/status" |
| perfpb "google.golang.org/grpc/test/codec_perf" |
| "google.golang.org/protobuf/proto" |
| ) |
| |
| const ( |
| defaultDecompressedData = "default decompressed data" |
| decompressionErrorMsg = "invalid compression format" |
| ) |
| |
| type fullReader struct { |
| data []byte |
| } |
| |
| func (f *fullReader) ReadMessageHeader(header []byte) error { |
| buf, err := f.Read(len(header)) |
| defer buf.Free() |
| if err != nil { |
| return err |
| } |
| |
| buf.CopyTo(header) |
| return nil |
| } |
| |
| func (f *fullReader) Read(n int) (mem.BufferSlice, error) { |
| if n == 0 { |
| return nil, nil |
| } |
| |
| if len(f.data) == 0 { |
| return nil, io.EOF |
| } |
| |
| if len(f.data) < n { |
| data := f.data |
| f.data = nil |
| return mem.BufferSlice{mem.SliceBuffer(data)}, io.ErrUnexpectedEOF |
| } |
| |
| buf := f.data[:n] |
| f.data = f.data[n:] |
| |
| return mem.BufferSlice{mem.SliceBuffer(buf)}, nil |
| } |
| |
| var _ CallOption = EmptyCallOption{} // ensure EmptyCallOption implements the interface |
| |
| func (s) TestSimpleParsing(t *testing.T) { |
| bigMsg := bytes.Repeat([]byte{'x'}, 1<<24) |
| for _, test := range []struct { |
| // input |
| p []byte |
| // outputs |
| err error |
| b []byte |
| pt payloadFormat |
| }{ |
| {nil, io.EOF, nil, compressionNone}, |
| {[]byte{0, 0, 0, 0, 0}, nil, nil, compressionNone}, |
| {[]byte{0, 0, 0, 0, 1, 'a'}, nil, []byte{'a'}, compressionNone}, |
| {[]byte{1, 0}, io.ErrUnexpectedEOF, nil, compressionNone}, |
| {[]byte{0, 0, 0, 0, 10, 'a'}, io.ErrUnexpectedEOF, nil, compressionNone}, |
| // Check that messages with length >= 2^24 are parsed. |
| {append([]byte{0, 1, 0, 0, 0}, bigMsg...), nil, bigMsg, compressionNone}, |
| } { |
| buf := &fullReader{test.p} |
| parser := &parser{r: buf, bufferPool: mem.DefaultBufferPool()} |
| pt, b, err := parser.recvMsg(math.MaxInt32) |
| if err != test.err || !bytes.Equal(b.Materialize(), test.b) || pt != test.pt { |
| t.Fatalf("parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, %v", test.p, pt, b, err, test.pt, test.b, test.err) |
| } |
| } |
| } |
| |
| func (s) TestMultipleParsing(t *testing.T) { |
| // Set a byte stream consists of 3 messages with their headers. |
| p := []byte{0, 0, 0, 0, 1, 'a', 0, 0, 0, 0, 2, 'b', 'c', 0, 0, 0, 0, 1, 'd'} |
| b := &fullReader{p} |
| parser := &parser{r: b, bufferPool: mem.DefaultBufferPool()} |
| |
| wantRecvs := []struct { |
| pt payloadFormat |
| data []byte |
| }{ |
| {compressionNone, []byte("a")}, |
| {compressionNone, []byte("bc")}, |
| {compressionNone, []byte("d")}, |
| } |
| for i, want := range wantRecvs { |
| pt, data, err := parser.recvMsg(math.MaxInt32) |
| if err != nil || pt != want.pt || !reflect.DeepEqual(data.Materialize(), want.data) { |
| t.Fatalf("after %d calls, parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, <nil>", |
| i, p, pt, data, err, want.pt, want.data) |
| } |
| } |
| |
| pt, data, err := parser.recvMsg(math.MaxInt32) |
| if err != io.EOF { |
| t.Fatalf("after %d recvMsgs calls, parser{%v}.recvMsg(_) = %v, %v, %v\nwant _, _, %v", |
| len(wantRecvs), p, pt, data, err, io.EOF) |
| } |
| } |
| |
| func (s) TestEncode(t *testing.T) { |
| for _, test := range []struct { |
| // input |
| msg proto.Message |
| // outputs |
| hdr []byte |
| data []byte |
| err error |
| }{ |
| {nil, []byte{0, 0, 0, 0, 0}, []byte{}, nil}, |
| } { |
| data, err := encode(getCodec(protoenc.Name), test.msg) |
| if err != test.err || !bytes.Equal(data.Materialize(), test.data) { |
| t.Errorf("encode(_, %v) = %v, %v; want %v, %v", test.msg, data, err, test.data, test.err) |
| continue |
| } |
| if hdr, _ := msgHeader(data, nil, compressionNone); !bytes.Equal(hdr, test.hdr) { |
| t.Errorf("msgHeader(%v, false) = %v; want %v", data, hdr, test.hdr) |
| } |
| } |
| } |
| |
| func (s) TestCompress(t *testing.T) { |
| bestCompressor, err := NewGZIPCompressorWithLevel(gzip.BestCompression) |
| if err != nil { |
| t.Fatalf("Could not initialize gzip compressor with best compression.") |
| } |
| bestSpeedCompressor, err := NewGZIPCompressorWithLevel(gzip.BestSpeed) |
| if err != nil { |
| t.Fatalf("Could not initialize gzip compressor with best speed compression.") |
| } |
| |
| defaultCompressor, err := NewGZIPCompressorWithLevel(gzip.BestSpeed) |
| if err != nil { |
| t.Fatalf("Could not initialize gzip compressor with default compression.") |
| } |
| |
| level5, err := NewGZIPCompressorWithLevel(5) |
| if err != nil { |
| t.Fatalf("Could not initialize gzip compressor with level 5 compression.") |
| } |
| |
| for _, test := range []struct { |
| // input |
| data []byte |
| cp Compressor |
| dc Decompressor |
| // outputs |
| err error |
| }{ |
| {make([]byte, 1024), NewGZIPCompressor(), NewGZIPDecompressor(), nil}, |
| {make([]byte, 1024), bestCompressor, NewGZIPDecompressor(), nil}, |
| {make([]byte, 1024), bestSpeedCompressor, NewGZIPDecompressor(), nil}, |
| {make([]byte, 1024), defaultCompressor, NewGZIPDecompressor(), nil}, |
| {make([]byte, 1024), level5, NewGZIPDecompressor(), nil}, |
| } { |
| b := new(bytes.Buffer) |
| if err := test.cp.Do(b, test.data); err != test.err { |
| t.Fatalf("Compressor.Do(_, %v) = %v, want %v", test.data, err, test.err) |
| } |
| if b.Len() >= len(test.data) { |
| t.Fatalf("The compressor fails to compress data.") |
| } |
| if p, err := test.dc.Do(b); err != nil || !bytes.Equal(test.data, p) { |
| t.Fatalf("Decompressor.Do(%v) = %v, %v, want %v, <nil>", b, p, err, test.data) |
| } |
| } |
| } |
| |
| func (s) TestToRPCErr(t *testing.T) { |
| for _, test := range []struct { |
| // input |
| errIn error |
| // outputs |
| errOut error |
| }{ |
| {transport.ErrConnClosing, status.Error(codes.Unavailable, transport.ErrConnClosing.Desc)}, |
| {io.ErrUnexpectedEOF, status.Error(codes.Internal, io.ErrUnexpectedEOF.Error())}, |
| } { |
| err := toRPCErr(test.errIn) |
| if _, ok := status.FromError(err); !ok { |
| t.Errorf("toRPCErr{%v} returned type %T, want %T", test.errIn, err, status.Error) |
| } |
| if !testutils.StatusErrEqual(err, test.errOut) { |
| t.Errorf("toRPCErr{%v} = %v \nwant %v", test.errIn, err, test.errOut) |
| } |
| } |
| } |
| |
| // bmEncode benchmarks encoding a Protocol Buffer message containing mSize |
| // bytes. |
| func bmEncode(b *testing.B, mSize int) { |
| cdc := getCodec(protoenc.Name) |
| msg := &perfpb.Buffer{Body: make([]byte, mSize)} |
| encodeData, _ := encode(cdc, msg) |
| encodedSz := int64(len(encodeData)) |
| b.ReportAllocs() |
| b.ResetTimer() |
| for i := 0; i < b.N; i++ { |
| encode(cdc, msg) |
| } |
| b.SetBytes(encodedSz) |
| } |
| |
| func BenchmarkEncode1B(b *testing.B) { |
| bmEncode(b, 1) |
| } |
| |
| func BenchmarkEncode1KiB(b *testing.B) { |
| bmEncode(b, 1024) |
| } |
| |
| func BenchmarkEncode8KiB(b *testing.B) { |
| bmEncode(b, 8*1024) |
| } |
| |
| func BenchmarkEncode64KiB(b *testing.B) { |
| bmEncode(b, 64*1024) |
| } |
| |
| func BenchmarkEncode512KiB(b *testing.B) { |
| bmEncode(b, 512*1024) |
| } |
| |
| func BenchmarkEncode1MiB(b *testing.B) { |
| bmEncode(b, 1024*1024) |
| } |
| |
| // bmCompressor benchmarks a compressor of a Protocol Buffer message containing |
| // mSize bytes. |
| func bmCompressor(b *testing.B, mSize int, cp Compressor) { |
| payload := make([]byte, mSize) |
| cBuf := bytes.NewBuffer(make([]byte, mSize)) |
| b.ReportAllocs() |
| b.ResetTimer() |
| for i := 0; i < b.N; i++ { |
| cp.Do(cBuf, payload) |
| cBuf.Reset() |
| } |
| } |
| |
| func BenchmarkGZIPCompressor1B(b *testing.B) { |
| bmCompressor(b, 1, NewGZIPCompressor()) |
| } |
| |
| func BenchmarkGZIPCompressor1KiB(b *testing.B) { |
| bmCompressor(b, 1024, NewGZIPCompressor()) |
| } |
| |
| func BenchmarkGZIPCompressor8KiB(b *testing.B) { |
| bmCompressor(b, 8*1024, NewGZIPCompressor()) |
| } |
| |
| func BenchmarkGZIPCompressor64KiB(b *testing.B) { |
| bmCompressor(b, 64*1024, NewGZIPCompressor()) |
| } |
| |
| func BenchmarkGZIPCompressor512KiB(b *testing.B) { |
| bmCompressor(b, 512*1024, NewGZIPCompressor()) |
| } |
| |
| func BenchmarkGZIPCompressor1MiB(b *testing.B) { |
| bmCompressor(b, 1024*1024, NewGZIPCompressor()) |
| } |
| |
| // compressWithDeterministicError compresses the input data and returns a BufferSlice. |
| func compressWithDeterministicError(t *testing.T, input []byte) mem.BufferSlice { |
| t.Helper() |
| var buf bytes.Buffer |
| gz := gzip.NewWriter(&buf) |
| if _, err := gz.Write(input); err != nil { |
| t.Fatalf("compressInput() failed to write data: %v", err) |
| } |
| if err := gz.Close(); err != nil { |
| t.Fatalf("compressInput() failed to close gzip writer: %v", err) |
| } |
| compressedData := buf.Bytes() |
| return mem.BufferSlice{mem.NewBuffer(&compressedData, nil)} |
| } |
| |
| // MockDecompressor is a mock implementation of a decompressor used for testing purposes. |
| // It simulates decompression behavior, returning either decompressed data or an error based on the ShouldError flag. |
| type MockDecompressor struct { |
| ShouldError bool // Flag to control whether the decompression should simulate an error. |
| } |
| |
| // Do simulates decompression. It returns a predefined error if ShouldError is true, |
| // or a fixed set of decompressed data if ShouldError is false. |
| func (m *MockDecompressor) Do(_ io.Reader) ([]byte, error) { |
| if m.ShouldError { |
| return nil, errors.New(decompressionErrorMsg) |
| } |
| return []byte(defaultDecompressedData), nil |
| } |
| |
| // Type returns the string identifier for the MockDecompressor. |
| func (m *MockDecompressor) Type() string { |
| return "MockDecompressor" |
| } |
| |
| // TestDecompress tests the decompress function behaves correctly for following scenarios |
| // decompress successfully when message is <= maxReceiveMessageSize |
| // errors when message > maxReceiveMessageSize |
| // decompress successfully when maxReceiveMessageSize is MaxInt |
| // errors when the decompressed message has an invalid format |
| // errors when the decompressed message exceeds the maxReceiveMessageSize. |
| func (s) TestDecompress(t *testing.T) { |
| compressor := encoding.GetCompressor("gzip") |
| validDecompressor := &MockDecompressor{ShouldError: false} |
| invalidFormatDecompressor := &MockDecompressor{ShouldError: true} |
| |
| testCases := []struct { |
| name string |
| input mem.BufferSlice |
| dc Decompressor |
| maxReceiveMessageSize int |
| want []byte |
| wantErr error |
| }{ |
| { |
| name: "Decompresses successfully with sufficient buffer size", |
| input: compressWithDeterministicError(t, []byte("decompressed data")), |
| dc: nil, |
| maxReceiveMessageSize: 50, |
| want: []byte("decompressed data"), |
| wantErr: nil, |
| }, |
| { |
| name: "Fails due to exceeding maxReceiveMessageSize", |
| input: compressWithDeterministicError(t, []byte("message that is too large")), |
| dc: nil, |
| maxReceiveMessageSize: len("message that is too large") - 1, |
| want: nil, |
| wantErr: status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max %d", len("message that is too large")-1), |
| }, |
| { |
| name: "Decompresses to exactly maxReceiveMessageSize", |
| input: compressWithDeterministicError(t, []byte("exact size message")), |
| dc: nil, |
| maxReceiveMessageSize: len("exact size message"), |
| want: []byte("exact size message"), |
| wantErr: nil, |
| }, |
| { |
| name: "Decompresses successfully with maxReceiveMessageSize MaxInt", |
| input: compressWithDeterministicError(t, []byte("large message")), |
| dc: nil, |
| maxReceiveMessageSize: math.MaxInt, |
| want: []byte("large message"), |
| wantErr: nil, |
| }, |
| { |
| name: "Fails with decompression error due to invalid format", |
| input: compressWithDeterministicError(t, []byte("invalid compressed data")), |
| dc: invalidFormatDecompressor, |
| maxReceiveMessageSize: 50, |
| want: nil, |
| wantErr: status.Errorf(codes.Internal, "grpc: failed to decompress the received message: %v", errors.New(decompressionErrorMsg)), |
| }, |
| { |
| name: "Fails with resourceExhausted error when decompressed message exceeds maxReceiveMessageSize", |
| input: compressWithDeterministicError(t, []byte("large compressed data")), |
| dc: validDecompressor, |
| maxReceiveMessageSize: 20, |
| want: nil, |
| wantErr: status.Errorf(codes.ResourceExhausted, "grpc: message after decompression larger than max (%d vs. %d)", 25, 20), |
| }, |
| } |
| |
| for _, tc := range testCases { |
| t.Run(tc.name, func(t *testing.T) { |
| output, err := decompress(compressor, tc.input, tc.dc, tc.maxReceiveMessageSize, mem.DefaultBufferPool()) |
| if !cmp.Equal(err, tc.wantErr, cmpopts.EquateErrors()) { |
| t.Fatalf("decompress() err = %v, wantErr = %v", err, tc.wantErr) |
| } |
| if !cmp.Equal(tc.want, output.Materialize()) { |
| t.Fatalf("decompress() output mismatch: got = %v, want = %v", output.Materialize(), tc.want) |
| } |
| }) |
| } |
| } |
| |
| type mockCompressor struct { |
| // Written to by the io.Reader on every call to Read. |
| ch chan<- struct{} |
| } |
| |
| func (m *mockCompressor) Compress(io.Writer) (io.WriteCloser, error) { |
| panic("unimplemented") |
| } |
| |
| func (m *mockCompressor) Decompress(io.Reader) (io.Reader, error) { |
| return m, nil |
| } |
| |
| func (m *mockCompressor) Read([]byte) (int, error) { |
| m.ch <- struct{}{} |
| return 1, io.EOF |
| } |
| |
| func (m *mockCompressor) Name() string { return "" } |
| |
| // Tests that the decompressor's Read method is not called after it returns EOF. |
| func (s) TestDecompress_NoReadAfterEOF(t *testing.T) { |
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| defer cancel() |
| |
| ch := make(chan struct{}, 10) |
| mc := &mockCompressor{ch: ch} |
| in := mem.BufferSlice{mem.NewBuffer(&[]byte{1, 2, 3}, nil)} |
| wg := sync.WaitGroup{} |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| out, err := decompress(mc, in, nil, 1, mem.DefaultBufferPool()) |
| if err != nil { |
| t.Errorf("Unexpected error from decompress: %v", err) |
| return |
| } |
| out.Free() |
| }() |
| select { |
| case <-ch: |
| case <-ctx.Done(): |
| t.Fatalf("Timed out waiting for call to compressor") |
| } |
| ctx, cancel = context.WithTimeout(ctx, defaultTestShortTimeout) |
| defer cancel() |
| select { |
| case <-ch: |
| t.Fatalf("Unexpected second compressor.Read call detected") |
| case <-ctx.Done(): |
| } |
| wg.Wait() |
| } |