Use pooled gzip.{Writer,Reader} in gzip{Compressor,Decompressor} (#1217)
This change saves a lot of memory by reusing the underlying
gzip.{Writer,Reader}, which allocates up to 1.4mb at every instanciation
according to [1]. This was fixed by adding a Reset method by to the
object at [2].
The amount of memory (and GC time) saved is pretty high, as reported by
pprof:
flat flat% sum% cum cum%
28.33GB 85.70% 85.70% 32.74GB 99.05% compress/flate.NewWriter
flat flat% sum% cum cum%
19.39MB 16.74% 16.74% 22.07MB 19.05% compress/flate.NewWriter
And the benchmarks:
benchmark old ns/op new ns/op delta
BenchmarkGZIPCompressor1B-4 215170 22291 -89.64%
BenchmarkGZIPCompressor1KiB-4 225971 27213 -87.96%
BenchmarkGZIPCompressor8KiB-4 246696 54785 -77.79%
BenchmarkGZIPCompressor64KiB-4 444851 286924 -35.50%
BenchmarkGZIPCompressor512KiB-4 2279043 2115863 -7.16%
BenchmarkGZIPCompressor1MiB-4 4412989 4258635 -3.50%
benchmark old allocs new allocs delta
BenchmarkGZIPCompressor1B-4 17 0 -100.00%
BenchmarkGZIPCompressor1KiB-4 17 0 -100.00%
BenchmarkGZIPCompressor8KiB-4 17 0 -100.00%
BenchmarkGZIPCompressor64KiB-4 17 0 -100.00%
BenchmarkGZIPCompressor512KiB-4 17 0 -100.00%
BenchmarkGZIPCompressor1MiB-4 17 0 -100.00%
benchmark old bytes new bytes delta
BenchmarkGZIPCompressor1B-4 813872 8 -100.00%
BenchmarkGZIPCompressor1KiB-4 813872 16 -100.00%
BenchmarkGZIPCompressor8KiB-4 813875 27 -100.00%
BenchmarkGZIPCompressor64KiB-4 813918 190 -99.98%
BenchmarkGZIPCompressor512KiB-4 814928 1871 -99.77%
BenchmarkGZIPCompressor1MiB-4 820889 9735 -98.81%
[1] https://github.com/golang/go/issues/6138
[2] https://github.com/golang/go/commit/db12f9d4e406dcab81b476e955c8e119112522fa
Signed-off-by: Steeve Morin <steeve.morin@gmail.com>
diff --git a/rpc_util.go b/rpc_util.go
index bd8379c..31a8732 100644
--- a/rpc_util.go
+++ b/rpc_util.go
@@ -41,6 +41,7 @@
"io/ioutil"
"math"
"os"
+ "sync"
"time"
"golang.org/x/net/context"
@@ -60,16 +61,24 @@
Type() string
}
-// NewGZIPCompressor creates a Compressor based on GZIP.
-func NewGZIPCompressor() Compressor {
- return &gzipCompressor{}
+type gzipCompressor struct {
+ pool sync.Pool
}
-type gzipCompressor struct {
+// NewGZIPCompressor creates a Compressor based on GZIP.
+func NewGZIPCompressor() Compressor {
+ return &gzipCompressor{
+ pool: sync.Pool{
+ New: func() interface{} {
+ return gzip.NewWriter(ioutil.Discard)
+ },
+ },
+ }
}
func (c *gzipCompressor) Do(w io.Writer, p []byte) error {
- z := gzip.NewWriter(w)
+ z := c.pool.Get().(*gzip.Writer)
+ z.Reset(w)
if _, err := z.Write(p); err != nil {
return err
}
@@ -89,6 +98,7 @@
}
type gzipDecompressor struct {
+ pool sync.Pool
}
// NewGZIPDecompressor creates a Decompressor based on GZIP.
@@ -97,11 +107,26 @@
}
func (d *gzipDecompressor) Do(r io.Reader) ([]byte, error) {
- z, err := gzip.NewReader(r)
- if err != nil {
- return nil, err
+ var z *gzip.Reader
+ switch maybeZ := d.pool.Get().(type) {
+ case nil:
+ newZ, err := gzip.NewReader(r)
+ if err != nil {
+ return nil, err
+ }
+ z = newZ
+ case *gzip.Reader:
+ z = maybeZ
+ if err := z.Reset(r); err != nil {
+ d.pool.Put(z)
+ return nil, err
+ }
}
- defer z.Close()
+
+ defer func() {
+ z.Close()
+ d.pool.Put(z)
+ }()
return ioutil.ReadAll(z)
}
diff --git a/rpc_util_test.go b/rpc_util_test.go
index 8c92b96..b2b85c7 100644
--- a/rpc_util_test.go
+++ b/rpc_util_test.go
@@ -130,7 +130,7 @@
// outputs
err error
}{
- {make([]byte, 1024), &gzipCompressor{}, &gzipDecompressor{}, nil},
+ {make([]byte, 1024), NewGZIPCompressor(), NewGZIPDecompressor(), nil},
} {
b := new(bytes.Buffer)
if err := test.cp.Do(b, test.data); err != test.err {
@@ -202,3 +202,40 @@
func BenchmarkEncode1MiB(b *testing.B) {
bmEncode(b, 1024*1024)
}
+
+// bmCompressor benchmarks a compressor of a Protocol Buffer message containing
+// mSize bytes.
+func bmCompressor(b *testing.B, mSize int, cp Compressor) {
+ payload := make([]byte, mSize)
+ cBuf := bytes.NewBuffer(make([]byte, mSize))
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ cp.Do(cBuf, payload)
+ cBuf.Reset()
+ }
+}
+
+func BenchmarkGZIPCompressor1B(b *testing.B) {
+ bmCompressor(b, 1, NewGZIPCompressor())
+}
+
+func BenchmarkGZIPCompressor1KiB(b *testing.B) {
+ bmCompressor(b, 1024, NewGZIPCompressor())
+}
+
+func BenchmarkGZIPCompressor8KiB(b *testing.B) {
+ bmCompressor(b, 8*1024, NewGZIPCompressor())
+}
+
+func BenchmarkGZIPCompressor64KiB(b *testing.B) {
+ bmCompressor(b, 64*1024, NewGZIPCompressor())
+}
+
+func BenchmarkGZIPCompressor512KiB(b *testing.B) {
+ bmCompressor(b, 512*1024, NewGZIPCompressor())
+}
+
+func BenchmarkGZIPCompressor1MiB(b *testing.B) {
+ bmCompressor(b, 1024*1024, NewGZIPCompressor())
+}