Use pooled gzip.{Writer,Reader} in gzip{Compressor,Decompressor} (#1217)

This change saves a lot of memory by reusing the underlying
gzip.{Writer,Reader}, which allocates up to 1.4mb at every instanciation
according to [1]. This was fixed by adding a Reset method by to the
object at [2].

The amount of memory (and GC time) saved is pretty high, as reported by
pprof:

      flat  flat%   sum%        cum   cum%
   28.33GB 85.70% 85.70%    32.74GB 99.05%  compress/flate.NewWriter

      flat  flat%   sum%        cum   cum%
   19.39MB 16.74% 16.74%    22.07MB 19.05%  compress/flate.NewWriter

And the benchmarks:

benchmark                           old ns/op     new ns/op     delta
BenchmarkGZIPCompressor1B-4         215170        22291         -89.64%
BenchmarkGZIPCompressor1KiB-4       225971        27213         -87.96%
BenchmarkGZIPCompressor8KiB-4       246696        54785         -77.79%
BenchmarkGZIPCompressor64KiB-4      444851        286924        -35.50%
BenchmarkGZIPCompressor512KiB-4     2279043       2115863       -7.16%
BenchmarkGZIPCompressor1MiB-4       4412989       4258635       -3.50%

benchmark                           old allocs     new allocs     delta
BenchmarkGZIPCompressor1B-4         17             0              -100.00%
BenchmarkGZIPCompressor1KiB-4       17             0              -100.00%
BenchmarkGZIPCompressor8KiB-4       17             0              -100.00%
BenchmarkGZIPCompressor64KiB-4      17             0              -100.00%
BenchmarkGZIPCompressor512KiB-4     17             0              -100.00%
BenchmarkGZIPCompressor1MiB-4       17             0              -100.00%

benchmark                           old bytes     new bytes     delta
BenchmarkGZIPCompressor1B-4         813872        8             -100.00%
BenchmarkGZIPCompressor1KiB-4       813872        16            -100.00%
BenchmarkGZIPCompressor8KiB-4       813875        27            -100.00%
BenchmarkGZIPCompressor64KiB-4      813918        190           -99.98%
BenchmarkGZIPCompressor512KiB-4     814928        1871          -99.77%
BenchmarkGZIPCompressor1MiB-4       820889        9735          -98.81%

[1] https://github.com/golang/go/issues/6138
[2] https://github.com/golang/go/commit/db12f9d4e406dcab81b476e955c8e119112522fa

Signed-off-by: Steeve Morin <steeve.morin@gmail.com>
diff --git a/rpc_util.go b/rpc_util.go
index bd8379c..31a8732 100644
--- a/rpc_util.go
+++ b/rpc_util.go
@@ -41,6 +41,7 @@
 	"io/ioutil"
 	"math"
 	"os"
+	"sync"
 	"time"
 
 	"golang.org/x/net/context"
@@ -60,16 +61,24 @@
 	Type() string
 }
 
-// NewGZIPCompressor creates a Compressor based on GZIP.
-func NewGZIPCompressor() Compressor {
-	return &gzipCompressor{}
+type gzipCompressor struct {
+	pool sync.Pool
 }
 
-type gzipCompressor struct {
+// NewGZIPCompressor creates a Compressor based on GZIP.
+func NewGZIPCompressor() Compressor {
+	return &gzipCompressor{
+		pool: sync.Pool{
+			New: func() interface{} {
+				return gzip.NewWriter(ioutil.Discard)
+			},
+		},
+	}
 }
 
 func (c *gzipCompressor) Do(w io.Writer, p []byte) error {
-	z := gzip.NewWriter(w)
+	z := c.pool.Get().(*gzip.Writer)
+	z.Reset(w)
 	if _, err := z.Write(p); err != nil {
 		return err
 	}
@@ -89,6 +98,7 @@
 }
 
 type gzipDecompressor struct {
+	pool sync.Pool
 }
 
 // NewGZIPDecompressor creates a Decompressor based on GZIP.
@@ -97,11 +107,26 @@
 }
 
 func (d *gzipDecompressor) Do(r io.Reader) ([]byte, error) {
-	z, err := gzip.NewReader(r)
-	if err != nil {
-		return nil, err
+	var z *gzip.Reader
+	switch maybeZ := d.pool.Get().(type) {
+	case nil:
+		newZ, err := gzip.NewReader(r)
+		if err != nil {
+			return nil, err
+		}
+		z = newZ
+	case *gzip.Reader:
+		z = maybeZ
+		if err := z.Reset(r); err != nil {
+			d.pool.Put(z)
+			return nil, err
+		}
 	}
-	defer z.Close()
+
+	defer func() {
+		z.Close()
+		d.pool.Put(z)
+	}()
 	return ioutil.ReadAll(z)
 }
 
diff --git a/rpc_util_test.go b/rpc_util_test.go
index 8c92b96..b2b85c7 100644
--- a/rpc_util_test.go
+++ b/rpc_util_test.go
@@ -130,7 +130,7 @@
 		// outputs
 		err error
 	}{
-		{make([]byte, 1024), &gzipCompressor{}, &gzipDecompressor{}, nil},
+		{make([]byte, 1024), NewGZIPCompressor(), NewGZIPDecompressor(), nil},
 	} {
 		b := new(bytes.Buffer)
 		if err := test.cp.Do(b, test.data); err != test.err {
@@ -202,3 +202,40 @@
 func BenchmarkEncode1MiB(b *testing.B) {
 	bmEncode(b, 1024*1024)
 }
+
+// bmCompressor benchmarks a compressor of a Protocol Buffer message containing
+// mSize bytes.
+func bmCompressor(b *testing.B, mSize int, cp Compressor) {
+	payload := make([]byte, mSize)
+	cBuf := bytes.NewBuffer(make([]byte, mSize))
+	b.ReportAllocs()
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		cp.Do(cBuf, payload)
+		cBuf.Reset()
+	}
+}
+
+func BenchmarkGZIPCompressor1B(b *testing.B) {
+	bmCompressor(b, 1, NewGZIPCompressor())
+}
+
+func BenchmarkGZIPCompressor1KiB(b *testing.B) {
+	bmCompressor(b, 1024, NewGZIPCompressor())
+}
+
+func BenchmarkGZIPCompressor8KiB(b *testing.B) {
+	bmCompressor(b, 8*1024, NewGZIPCompressor())
+}
+
+func BenchmarkGZIPCompressor64KiB(b *testing.B) {
+	bmCompressor(b, 64*1024, NewGZIPCompressor())
+}
+
+func BenchmarkGZIPCompressor512KiB(b *testing.B) {
+	bmCompressor(b, 512*1024, NewGZIPCompressor())
+}
+
+func BenchmarkGZIPCompressor1MiB(b *testing.B) {
+	bmCompressor(b, 1024*1024, NewGZIPCompressor())
+}