use proto.Buffer API for protobuf codec and cache proto.Buffer structs (#1010)

* use a global sharded pool of proto.Buffer caches in protoCodec

* fix goimports

* make global buffer pool index counter atomic

* hack to remove alloc in encode_len_struct

* remove extra slice alloc in proto codec marshal

* replce magic number for proto size field length with constant

* replace custom cache with sync.Pool

* remove 1 line functions in codec.go and add protoCodec microbenchmarks

* add concurrent usage test for protoCodec

* fix golint.gofmt,goimport checks

* fix issues in codec.go and codec_test.go

* use go parallel benchmark helpers

* replace proto.Codec with a guess of size needed

* update Fatalf -> Errorf in tests

* wrap proto.Buffer along with cached last size into larger struct for pool use

* make wrapped proto buffer only a literal

* fix style and imports

* move b.Run into inner function

* reverse micro benchmark op order to unmarshal-marshal and fix benchmark setup-in-test bug

* add test for large message

* remove use of defer in codec.marshal

* revert recent changes to codec bencmarks

* move sub-benchmarks into >= go-1.7 only file

* add commentfor marshaler and tweak benchmark subtests for easier usage

* move build tag for go1.7 on benchmarks to inside file

* move build tag to top of file

* comment Codec, embed proto.Buffer into cached struct and add an int32 cap
diff --git a/codec.go b/codec.go
new file mode 100644
index 0000000..bd76ebb
--- /dev/null
+++ b/codec.go
@@ -0,0 +1,118 @@
+/*
+*
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+*/
+
+package grpc
+
+import (
+	"math"
+	"sync"
+
+	"github.com/golang/protobuf/proto"
+)
+
+// Codec defines the interface gRPC uses to encode and decode messages.
+// Note that implementations of this interface must be thread safe;
+// a Codec's methods can be called from concurrent goroutines.
+type Codec interface {
+	// Marshal returns the wire format of v.
+	Marshal(v interface{}) ([]byte, error)
+	// Unmarshal parses the wire format into v.
+	Unmarshal(data []byte, v interface{}) error
+	// String returns the name of the Codec implementation. The returned
+	// string will be used as part of content type in transmission.
+	String() string
+}
+
+// protoCodec is a Codec implementation with protobuf. It is the default codec for gRPC.
+type protoCodec struct {
+}
+
+type cachedProtoBuffer struct {
+	lastMarshaledSize uint32
+	proto.Buffer
+}
+
+func capToMaxInt32(val int) uint32 {
+	if val > math.MaxInt32 {
+		return uint32(math.MaxInt32)
+	}
+	return uint32(val)
+}
+
+func (p protoCodec) marshal(v interface{}, cb *cachedProtoBuffer) ([]byte, error) {
+	protoMsg := v.(proto.Message)
+	newSlice := make([]byte, 0, cb.lastMarshaledSize)
+
+	cb.SetBuf(newSlice)
+	cb.Reset()
+	if err := cb.Marshal(protoMsg); err != nil {
+		return nil, err
+	}
+	out := cb.Bytes()
+	cb.lastMarshaledSize = capToMaxInt32(len(out))
+	return out, nil
+}
+
+func (p protoCodec) Marshal(v interface{}) ([]byte, error) {
+	cb := protoBufferPool.Get().(*cachedProtoBuffer)
+	out, err := p.marshal(v, cb)
+
+	// put back buffer and lose the ref to the slice
+	cb.SetBuf(nil)
+	protoBufferPool.Put(cb)
+	return out, err
+}
+
+func (p protoCodec) Unmarshal(data []byte, v interface{}) error {
+	cb := protoBufferPool.Get().(*cachedProtoBuffer)
+	cb.SetBuf(data)
+	err := cb.Unmarshal(v.(proto.Message))
+	cb.SetBuf(nil)
+	protoBufferPool.Put(cb)
+	return err
+}
+
+func (protoCodec) String() string {
+	return "proto"
+}
+
+var (
+	protoBufferPool = &sync.Pool{
+		New: func() interface{} {
+			return &cachedProtoBuffer{
+				Buffer:            proto.Buffer{},
+				lastMarshaledSize: 16,
+			}
+		},
+	}
+)
diff --git a/codec_benchmark_test.go b/codec_benchmark_test.go
new file mode 100644
index 0000000..6726a53
--- /dev/null
+++ b/codec_benchmark_test.go
@@ -0,0 +1,115 @@
+// +build go1.7
+
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package grpc
+
+import (
+	"fmt"
+	"testing"
+
+	"github.com/golang/protobuf/proto"
+	"google.golang.org/grpc/test/codec_perf"
+)
+
+func setupBenchmarkProtoCodecInputs(b *testing.B, payloadBaseSize uint32) []proto.Message {
+	payloadBase := make([]byte, payloadBaseSize)
+	// arbitrary byte slices
+	payloadSuffixes := [][]byte{
+		[]byte("one"),
+		[]byte("two"),
+		[]byte("three"),
+		[]byte("four"),
+		[]byte("five"),
+	}
+	protoStructs := make([]proto.Message, 0)
+
+	for _, p := range payloadSuffixes {
+		ps := &codec_perf.Buffer{}
+		ps.Body = append(payloadBase, p...)
+		protoStructs = append(protoStructs, ps)
+	}
+
+	return protoStructs
+}
+
+// The possible use of certain protobuf APIs like the proto.Buffer API potentially involves caching
+// on our side. This can add checks around memory allocations and possible contention.
+// Example run: go test -v -run=^$ -bench=BenchmarkProtoCodec -benchmem
+func BenchmarkProtoCodec(b *testing.B) {
+	// range of message sizes
+	payloadBaseSizes := make([]uint32, 0)
+	for i := uint32(0); i <= 12; i += 4 {
+		payloadBaseSizes = append(payloadBaseSizes, 1<<i)
+	}
+	// range of SetParallelism
+	parallelisms := make([]uint32, 0)
+	for i := uint32(0); i <= 16; i += 4 {
+		parallelisms = append(parallelisms, 1<<i)
+	}
+	for _, s := range payloadBaseSizes {
+		for _, p := range parallelisms {
+			func(parallelism int, payloadBaseSize uint32) {
+				protoStructs := setupBenchmarkProtoCodecInputs(b, payloadBaseSize)
+				name := fmt.Sprintf("MinPayloadSize:%v/SetParallelism(%v)", payloadBaseSize, parallelism)
+				b.Run(name, func(b *testing.B) {
+					codec := &protoCodec{}
+					b.SetParallelism(parallelism)
+					b.RunParallel(func(pb *testing.PB) {
+						benchmarkProtoCodec(codec, protoStructs, pb, b)
+					})
+				})
+			}(int(p), s)
+		}
+	}
+}
+
+func benchmarkProtoCodec(codec *protoCodec, protoStructs []proto.Message, pb *testing.PB, b *testing.B) {
+	counter := 0
+	for pb.Next() {
+		counter++
+		ps := protoStructs[counter%len(protoStructs)]
+		fastMarshalAndUnmarshal(codec, ps, b)
+	}
+}
+
+func fastMarshalAndUnmarshal(protoCodec Codec, protoStruct proto.Message, b *testing.B) {
+	marshaledBytes, err := protoCodec.Marshal(protoStruct)
+	if err != nil {
+		b.Errorf("protoCodec.Marshal(_) returned an error")
+	}
+	if err := protoCodec.Unmarshal(marshaledBytes, protoStruct); err != nil {
+		b.Errorf("protoCodec.Unmarshal(_) returned an error")
+	}
+}
diff --git a/codec_test.go b/codec_test.go
new file mode 100644
index 0000000..8e9b215
--- /dev/null
+++ b/codec_test.go
@@ -0,0 +1,143 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package grpc
+
+import (
+	"bytes"
+	"sync"
+	"testing"
+
+	"google.golang.org/grpc/test/codec_perf"
+)
+
+func marshalAndUnmarshal(t *testing.T, protoCodec Codec, expectedBody []byte) {
+	p := &codec_perf.Buffer{}
+	p.Body = expectedBody
+
+	marshalledBytes, err := protoCodec.Marshal(p)
+	if err != nil {
+		t.Errorf("protoCodec.Marshal(_) returned an error")
+	}
+
+	if err := protoCodec.Unmarshal(marshalledBytes, p); err != nil {
+		t.Errorf("protoCodec.Unmarshal(_) returned an error")
+	}
+
+	if bytes.Compare(p.GetBody(), expectedBody) != 0 {
+		t.Errorf("Unexpected body; got %v; want %v", p.GetBody(), expectedBody)
+	}
+}
+
+func TestBasicProtoCodecMarshalAndUnmarshal(t *testing.T) {
+	marshalAndUnmarshal(t, protoCodec{}, []byte{1, 2, 3})
+}
+
+// Try to catch possible race conditions around use of pools
+func TestConcurrentUsage(t *testing.T) {
+	const (
+		numGoRoutines   = 100
+		numMarshUnmarsh = 1000
+	)
+
+	// small, arbitrary byte slices
+	protoBodies := [][]byte{
+		[]byte("one"),
+		[]byte("two"),
+		[]byte("three"),
+		[]byte("four"),
+		[]byte("five"),
+	}
+
+	var wg sync.WaitGroup
+	codec := protoCodec{}
+
+	for i := 0; i < numGoRoutines; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			for k := 0; k < numMarshUnmarsh; k++ {
+				marshalAndUnmarshal(t, codec, protoBodies[k%len(protoBodies)])
+			}
+		}()
+	}
+
+	wg.Wait()
+}
+
+// TestStaggeredMarshalAndUnmarshalUsingSamePool tries to catch potential errors in which slices get
+// stomped on during reuse of a proto.Buffer.
+func TestStaggeredMarshalAndUnmarshalUsingSamePool(t *testing.T) {
+	codec1 := protoCodec{}
+	codec2 := protoCodec{}
+
+	expectedBody1 := []byte{1, 2, 3}
+	expectedBody2 := []byte{4, 5, 6}
+
+	proto1 := codec_perf.Buffer{Body: expectedBody1}
+	proto2 := codec_perf.Buffer{Body: expectedBody2}
+
+	var m1, m2 []byte
+	var err error
+
+	if m1, err = codec1.Marshal(&proto1); err != nil {
+		t.Errorf("protoCodec.Marshal(%v) failed", proto1)
+	}
+
+	if m2, err = codec2.Marshal(&proto2); err != nil {
+		t.Errorf("protoCodec.Marshal(%v) failed", proto2)
+	}
+
+	if err = codec1.Unmarshal(m1, &proto1); err != nil {
+		t.Errorf("protoCodec.Unmarshal(%v) failed", m1)
+	}
+
+	if err = codec2.Unmarshal(m2, &proto2); err != nil {
+		t.Errorf("protoCodec.Unmarshal(%v) failed", m2)
+	}
+
+	b1 := proto1.GetBody()
+	b2 := proto2.GetBody()
+
+	for i, v := range b1 {
+		if expectedBody1[i] != v {
+			t.Errorf("expected %v at index %v but got %v", i, expectedBody1[i], v)
+		}
+	}
+
+	for i, v := range b2 {
+		if expectedBody2[i] != v {
+			t.Errorf("expected %v at index %v but got %v", i, expectedBody2[i], v)
+		}
+	}
+}
diff --git a/rpc_util.go b/rpc_util.go
index db56a88..4d12528 100644
--- a/rpc_util.go
+++ b/rpc_util.go
@@ -43,7 +43,6 @@
 	"os"
 	"time"
 
-	"github.com/golang/protobuf/proto"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/metadata"
@@ -53,32 +52,6 @@
 	"google.golang.org/grpc/transport"
 )
 
-// Codec defines the interface gRPC uses to encode and decode messages.
-type Codec interface {
-	// Marshal returns the wire format of v.
-	Marshal(v interface{}) ([]byte, error)
-	// Unmarshal parses the wire format into v.
-	Unmarshal(data []byte, v interface{}) error
-	// String returns the name of the Codec implementation. The returned
-	// string will be used as part of content type in transmission.
-	String() string
-}
-
-// protoCodec is a Codec implementation with protobuf. It is the default codec for gRPC.
-type protoCodec struct{}
-
-func (protoCodec) Marshal(v interface{}) ([]byte, error) {
-	return proto.Marshal(v.(proto.Message))
-}
-
-func (protoCodec) Unmarshal(data []byte, v interface{}) error {
-	return proto.Unmarshal(data, v.(proto.Message))
-}
-
-func (protoCodec) String() string {
-	return "proto"
-}
-
 // Compressor defines the interface gRPC uses to compress a message.
 type Compressor interface {
 	// Do compresses p into w.