use proto.Buffer API for protobuf codec and cache proto.Buffer structs (#1010)
* use a global sharded pool of proto.Buffer caches in protoCodec
* fix goimports
* make global buffer pool index counter atomic
* hack to remove alloc in encode_len_struct
* remove extra slice alloc in proto codec marshal
* replce magic number for proto size field length with constant
* replace custom cache with sync.Pool
* remove 1 line functions in codec.go and add protoCodec microbenchmarks
* add concurrent usage test for protoCodec
* fix golint.gofmt,goimport checks
* fix issues in codec.go and codec_test.go
* use go parallel benchmark helpers
* replace proto.Codec with a guess of size needed
* update Fatalf -> Errorf in tests
* wrap proto.Buffer along with cached last size into larger struct for pool use
* make wrapped proto buffer only a literal
* fix style and imports
* move b.Run into inner function
* reverse micro benchmark op order to unmarshal-marshal and fix benchmark setup-in-test bug
* add test for large message
* remove use of defer in codec.marshal
* revert recent changes to codec bencmarks
* move sub-benchmarks into >= go-1.7 only file
* add commentfor marshaler and tweak benchmark subtests for easier usage
* move build tag for go1.7 on benchmarks to inside file
* move build tag to top of file
* comment Codec, embed proto.Buffer into cached struct and add an int32 cap
diff --git a/codec.go b/codec.go
new file mode 100644
index 0000000..bd76ebb
--- /dev/null
+++ b/codec.go
@@ -0,0 +1,118 @@
+/*
+*
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+*/
+
+package grpc
+
+import (
+ "math"
+ "sync"
+
+ "github.com/golang/protobuf/proto"
+)
+
+// Codec defines the interface gRPC uses to encode and decode messages.
+// Note that implementations of this interface must be thread safe;
+// a Codec's methods can be called from concurrent goroutines.
+type Codec interface {
+ // Marshal returns the wire format of v.
+ Marshal(v interface{}) ([]byte, error)
+ // Unmarshal parses the wire format into v.
+ Unmarshal(data []byte, v interface{}) error
+ // String returns the name of the Codec implementation. The returned
+ // string will be used as part of content type in transmission.
+ String() string
+}
+
+// protoCodec is a Codec implementation with protobuf. It is the default codec for gRPC.
+type protoCodec struct {
+}
+
+type cachedProtoBuffer struct {
+ lastMarshaledSize uint32
+ proto.Buffer
+}
+
+func capToMaxInt32(val int) uint32 {
+ if val > math.MaxInt32 {
+ return uint32(math.MaxInt32)
+ }
+ return uint32(val)
+}
+
+func (p protoCodec) marshal(v interface{}, cb *cachedProtoBuffer) ([]byte, error) {
+ protoMsg := v.(proto.Message)
+ newSlice := make([]byte, 0, cb.lastMarshaledSize)
+
+ cb.SetBuf(newSlice)
+ cb.Reset()
+ if err := cb.Marshal(protoMsg); err != nil {
+ return nil, err
+ }
+ out := cb.Bytes()
+ cb.lastMarshaledSize = capToMaxInt32(len(out))
+ return out, nil
+}
+
+func (p protoCodec) Marshal(v interface{}) ([]byte, error) {
+ cb := protoBufferPool.Get().(*cachedProtoBuffer)
+ out, err := p.marshal(v, cb)
+
+ // put back buffer and lose the ref to the slice
+ cb.SetBuf(nil)
+ protoBufferPool.Put(cb)
+ return out, err
+}
+
+func (p protoCodec) Unmarshal(data []byte, v interface{}) error {
+ cb := protoBufferPool.Get().(*cachedProtoBuffer)
+ cb.SetBuf(data)
+ err := cb.Unmarshal(v.(proto.Message))
+ cb.SetBuf(nil)
+ protoBufferPool.Put(cb)
+ return err
+}
+
+func (protoCodec) String() string {
+ return "proto"
+}
+
+var (
+ protoBufferPool = &sync.Pool{
+ New: func() interface{} {
+ return &cachedProtoBuffer{
+ Buffer: proto.Buffer{},
+ lastMarshaledSize: 16,
+ }
+ },
+ }
+)
diff --git a/codec_benchmark_test.go b/codec_benchmark_test.go
new file mode 100644
index 0000000..6726a53
--- /dev/null
+++ b/codec_benchmark_test.go
@@ -0,0 +1,115 @@
+// +build go1.7
+
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package grpc
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/golang/protobuf/proto"
+ "google.golang.org/grpc/test/codec_perf"
+)
+
+func setupBenchmarkProtoCodecInputs(b *testing.B, payloadBaseSize uint32) []proto.Message {
+ payloadBase := make([]byte, payloadBaseSize)
+ // arbitrary byte slices
+ payloadSuffixes := [][]byte{
+ []byte("one"),
+ []byte("two"),
+ []byte("three"),
+ []byte("four"),
+ []byte("five"),
+ }
+ protoStructs := make([]proto.Message, 0)
+
+ for _, p := range payloadSuffixes {
+ ps := &codec_perf.Buffer{}
+ ps.Body = append(payloadBase, p...)
+ protoStructs = append(protoStructs, ps)
+ }
+
+ return protoStructs
+}
+
+// The possible use of certain protobuf APIs like the proto.Buffer API potentially involves caching
+// on our side. This can add checks around memory allocations and possible contention.
+// Example run: go test -v -run=^$ -bench=BenchmarkProtoCodec -benchmem
+func BenchmarkProtoCodec(b *testing.B) {
+ // range of message sizes
+ payloadBaseSizes := make([]uint32, 0)
+ for i := uint32(0); i <= 12; i += 4 {
+ payloadBaseSizes = append(payloadBaseSizes, 1<<i)
+ }
+ // range of SetParallelism
+ parallelisms := make([]uint32, 0)
+ for i := uint32(0); i <= 16; i += 4 {
+ parallelisms = append(parallelisms, 1<<i)
+ }
+ for _, s := range payloadBaseSizes {
+ for _, p := range parallelisms {
+ func(parallelism int, payloadBaseSize uint32) {
+ protoStructs := setupBenchmarkProtoCodecInputs(b, payloadBaseSize)
+ name := fmt.Sprintf("MinPayloadSize:%v/SetParallelism(%v)", payloadBaseSize, parallelism)
+ b.Run(name, func(b *testing.B) {
+ codec := &protoCodec{}
+ b.SetParallelism(parallelism)
+ b.RunParallel(func(pb *testing.PB) {
+ benchmarkProtoCodec(codec, protoStructs, pb, b)
+ })
+ })
+ }(int(p), s)
+ }
+ }
+}
+
+func benchmarkProtoCodec(codec *protoCodec, protoStructs []proto.Message, pb *testing.PB, b *testing.B) {
+ counter := 0
+ for pb.Next() {
+ counter++
+ ps := protoStructs[counter%len(protoStructs)]
+ fastMarshalAndUnmarshal(codec, ps, b)
+ }
+}
+
+func fastMarshalAndUnmarshal(protoCodec Codec, protoStruct proto.Message, b *testing.B) {
+ marshaledBytes, err := protoCodec.Marshal(protoStruct)
+ if err != nil {
+ b.Errorf("protoCodec.Marshal(_) returned an error")
+ }
+ if err := protoCodec.Unmarshal(marshaledBytes, protoStruct); err != nil {
+ b.Errorf("protoCodec.Unmarshal(_) returned an error")
+ }
+}
diff --git a/codec_test.go b/codec_test.go
new file mode 100644
index 0000000..8e9b215
--- /dev/null
+++ b/codec_test.go
@@ -0,0 +1,143 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package grpc
+
+import (
+ "bytes"
+ "sync"
+ "testing"
+
+ "google.golang.org/grpc/test/codec_perf"
+)
+
+func marshalAndUnmarshal(t *testing.T, protoCodec Codec, expectedBody []byte) {
+ p := &codec_perf.Buffer{}
+ p.Body = expectedBody
+
+ marshalledBytes, err := protoCodec.Marshal(p)
+ if err != nil {
+ t.Errorf("protoCodec.Marshal(_) returned an error")
+ }
+
+ if err := protoCodec.Unmarshal(marshalledBytes, p); err != nil {
+ t.Errorf("protoCodec.Unmarshal(_) returned an error")
+ }
+
+ if bytes.Compare(p.GetBody(), expectedBody) != 0 {
+ t.Errorf("Unexpected body; got %v; want %v", p.GetBody(), expectedBody)
+ }
+}
+
+func TestBasicProtoCodecMarshalAndUnmarshal(t *testing.T) {
+ marshalAndUnmarshal(t, protoCodec{}, []byte{1, 2, 3})
+}
+
+// Try to catch possible race conditions around use of pools
+func TestConcurrentUsage(t *testing.T) {
+ const (
+ numGoRoutines = 100
+ numMarshUnmarsh = 1000
+ )
+
+ // small, arbitrary byte slices
+ protoBodies := [][]byte{
+ []byte("one"),
+ []byte("two"),
+ []byte("three"),
+ []byte("four"),
+ []byte("five"),
+ }
+
+ var wg sync.WaitGroup
+ codec := protoCodec{}
+
+ for i := 0; i < numGoRoutines; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for k := 0; k < numMarshUnmarsh; k++ {
+ marshalAndUnmarshal(t, codec, protoBodies[k%len(protoBodies)])
+ }
+ }()
+ }
+
+ wg.Wait()
+}
+
+// TestStaggeredMarshalAndUnmarshalUsingSamePool tries to catch potential errors in which slices get
+// stomped on during reuse of a proto.Buffer.
+func TestStaggeredMarshalAndUnmarshalUsingSamePool(t *testing.T) {
+ codec1 := protoCodec{}
+ codec2 := protoCodec{}
+
+ expectedBody1 := []byte{1, 2, 3}
+ expectedBody2 := []byte{4, 5, 6}
+
+ proto1 := codec_perf.Buffer{Body: expectedBody1}
+ proto2 := codec_perf.Buffer{Body: expectedBody2}
+
+ var m1, m2 []byte
+ var err error
+
+ if m1, err = codec1.Marshal(&proto1); err != nil {
+ t.Errorf("protoCodec.Marshal(%v) failed", proto1)
+ }
+
+ if m2, err = codec2.Marshal(&proto2); err != nil {
+ t.Errorf("protoCodec.Marshal(%v) failed", proto2)
+ }
+
+ if err = codec1.Unmarshal(m1, &proto1); err != nil {
+ t.Errorf("protoCodec.Unmarshal(%v) failed", m1)
+ }
+
+ if err = codec2.Unmarshal(m2, &proto2); err != nil {
+ t.Errorf("protoCodec.Unmarshal(%v) failed", m2)
+ }
+
+ b1 := proto1.GetBody()
+ b2 := proto2.GetBody()
+
+ for i, v := range b1 {
+ if expectedBody1[i] != v {
+ t.Errorf("expected %v at index %v but got %v", i, expectedBody1[i], v)
+ }
+ }
+
+ for i, v := range b2 {
+ if expectedBody2[i] != v {
+ t.Errorf("expected %v at index %v but got %v", i, expectedBody2[i], v)
+ }
+ }
+}
diff --git a/rpc_util.go b/rpc_util.go
index db56a88..4d12528 100644
--- a/rpc_util.go
+++ b/rpc_util.go
@@ -43,7 +43,6 @@
"os"
"time"
- "github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
@@ -53,32 +52,6 @@
"google.golang.org/grpc/transport"
)
-// Codec defines the interface gRPC uses to encode and decode messages.
-type Codec interface {
- // Marshal returns the wire format of v.
- Marshal(v interface{}) ([]byte, error)
- // Unmarshal parses the wire format into v.
- Unmarshal(data []byte, v interface{}) error
- // String returns the name of the Codec implementation. The returned
- // string will be used as part of content type in transmission.
- String() string
-}
-
-// protoCodec is a Codec implementation with protobuf. It is the default codec for gRPC.
-type protoCodec struct{}
-
-func (protoCodec) Marshal(v interface{}) ([]byte, error) {
- return proto.Marshal(v.(proto.Message))
-}
-
-func (protoCodec) Unmarshal(data []byte, v interface{}) error {
- return proto.Unmarshal(data, v.(proto.Message))
-}
-
-func (protoCodec) String() string {
- return "proto"
-}
-
// Compressor defines the interface gRPC uses to compress a message.
type Compressor interface {
// Do compresses p into w.