leveldb: partially rewrite batch and db writer

The new batch implementation is more cleaner by spliting batch header
buffer. Also added index, replaying batch should be much more performant now.

DB writer now guarantee that user batch content won't be changed, previous
implementation would append batch directly into user batch during write
merge, this plain wrong and can potentially cause issue. Also added options
to disable write merge.

There are internal corruption detected on issue #155, the batch apparently
contains invalid batch header and record with invalid length. Also lots
of batches with invalid sequence number present in the journal. Hopefully
this change fixes some of that.
diff --git a/leveldb/batch.go b/leveldb/batch.go
index 5010067..407bc72 100644
--- a/leveldb/batch.go
+++ b/leveldb/batch.go
@@ -7,8 +7,10 @@
 package leveldb
 
 import (
+	"bufio"
 	"encoding/binary"
 	"fmt"
+	"io"
 
 	"github.com/syndtr/goleveldb/leveldb/errors"
 	"github.com/syndtr/goleveldb/leveldb/memdb"
@@ -29,8 +31,9 @@
 }
 
 const (
-	batchHdrLen  = 8 + 4
-	batchGrowRec = 3000
+	batchHeaderLen = 8 + 4
+	batchGrowRec   = 3000
+	batchBufioSize = 16
 )
 
 // BatchReplay wraps basic batch operations.
@@ -39,34 +42,46 @@
 	Delete(key []byte)
 }
 
+type batchIndex struct {
+	keyType            keyType
+	keyPos, keyLen     int
+	valuePos, valueLen int
+}
+
+func (index batchIndex) k(data []byte) []byte {
+	return data[index.keyPos : index.keyPos+index.keyLen]
+}
+
+func (index batchIndex) v(data []byte) []byte {
+	if index.valueLen != 0 {
+		return data[index.valuePos : index.valuePos+index.valueLen]
+	}
+	return nil
+}
+
+func (index batchIndex) kv(data []byte) (key, value []byte) {
+	return index.k(data), index.v(data)
+}
+
 // Batch is a write batch.
 type Batch struct {
-	data       []byte
-	rLen, bLen int
-	seq        uint64
-	sync       bool
+	data  []byte
+	index []batchIndex
+
+	// internalLen is sums of key/value pair length plus 8-bytes internal key.
+	internalLen int
 }
 
 func (b *Batch) grow(n int) {
-	off := len(b.data)
-	if off == 0 {
-		off = batchHdrLen
-		if b.data != nil {
-			b.data = b.data[:off]
+	o := len(b.data)
+	if cap(b.data)-o < n {
+		div := 1
+		if len(b.index) > batchGrowRec {
+			div = len(b.index) / batchGrowRec
 		}
-	}
-	if cap(b.data)-off < n {
-		if b.data == nil {
-			b.data = make([]byte, off, off+n)
-		} else {
-			odata := b.data
-			div := 1
-			if b.rLen > batchGrowRec {
-				div = b.rLen / batchGrowRec
-			}
-			b.data = make([]byte, off, off+n+(off-batchHdrLen)/div)
-			copy(b.data, odata)
-		}
+		ndata := make([]byte, o, o+n+o/div)
+		copy(ndata, b.data)
+		b.data = ndata
 	}
 }
 
@@ -76,32 +91,36 @@
 		n += binary.MaxVarintLen32 + len(value)
 	}
 	b.grow(n)
-	off := len(b.data)
-	data := b.data[:off+n]
-	data[off] = byte(kt)
-	off++
-	off += binary.PutUvarint(data[off:], uint64(len(key)))
-	copy(data[off:], key)
-	off += len(key)
+	index := batchIndex{keyType: kt}
+	o := len(b.data)
+	data := b.data[:o+n]
+	data[o] = byte(kt)
+	o++
+	o += binary.PutUvarint(data[o:], uint64(len(key)))
+	index.keyPos = o
+	index.keyLen = len(key)
+	o += copy(data[o:], key)
 	if kt == keyTypeVal {
-		off += binary.PutUvarint(data[off:], uint64(len(value)))
-		copy(data[off:], value)
-		off += len(value)
+		o += binary.PutUvarint(data[o:], uint64(len(value)))
+		index.valuePos = o
+		index.valueLen = len(value)
+		o += copy(data[o:], value)
 	}
-	b.data = data[:off]
-	b.rLen++
-	//  Include 8-byte ikey header
-	b.bLen += len(key) + len(value) + 8
+	b.data = data[:o]
+	b.index = append(b.index, index)
+	b.internalLen += index.keyLen + index.valueLen + 8
 }
 
 // Put appends 'put operation' of the given key/value pair to the batch.
-// It is safe to modify the contents of the argument after Put returns.
+// It is safe to modify the contents of the argument after Put returns but not
+// before.
 func (b *Batch) Put(key, value []byte) {
 	b.appendRec(keyTypeVal, key, value)
 }
 
 // Delete appends 'delete operation' of the given key to the batch.
-// It is safe to modify the contents of the argument after Delete returns.
+// It is safe to modify the contents of the argument after Delete returns but
+// not before.
 func (b *Batch) Delete(key []byte) {
 	b.appendRec(keyTypeDel, key, nil)
 }
@@ -111,7 +130,7 @@
 // The returned slice is not its own copy, so the contents should not be
 // modified.
 func (b *Batch) Dump() []byte {
-	return b.encode()
+	return b.data
 }
 
 // Load loads given slice into the batch. Previous contents of the batch
@@ -119,144 +138,302 @@
 // The given slice will not be copied and will be used as batch buffer, so
 // it is not safe to modify the contents of the slice.
 func (b *Batch) Load(data []byte) error {
-	return b.decode(0, data)
+	return b.decode(data, -1)
 }
 
 // Replay replays batch contents.
 func (b *Batch) Replay(r BatchReplay) error {
-	return b.decodeRec(func(i int, kt keyType, key, value []byte) error {
-		switch kt {
+	for _, index := range b.index {
+		switch index.keyType {
 		case keyTypeVal:
-			r.Put(key, value)
+			r.Put(index.k(b.data), index.v(b.data))
 		case keyTypeDel:
-			r.Delete(key)
+			r.Delete(index.k(b.data))
 		}
-		return nil
-	})
+	}
+	return nil
 }
 
 // Len returns number of records in the batch.
 func (b *Batch) Len() int {
-	return b.rLen
+	return len(b.index)
 }
 
 // Reset resets the batch.
 func (b *Batch) Reset() {
 	b.data = b.data[:0]
-	b.seq = 0
-	b.rLen = 0
-	b.bLen = 0
-	b.sync = false
+	b.index = b.index[:0]
+	b.internalLen = 0
 }
 
-func (b *Batch) init(sync bool) {
-	b.sync = sync
-}
-
-func (b *Batch) append(p *Batch) {
-	if p.rLen > 0 {
-		b.grow(len(p.data) - batchHdrLen)
-		b.data = append(b.data, p.data[batchHdrLen:]...)
-		b.rLen += p.rLen
-		b.bLen += p.bLen
-	}
-	if p.sync {
-		b.sync = true
-	}
-}
-
-// size returns sums of key/value pair length plus 8-bytes ikey.
-func (b *Batch) size() int {
-	return b.bLen
-}
-
-func (b *Batch) encode() []byte {
-	b.grow(0)
-	binary.LittleEndian.PutUint64(b.data, b.seq)
-	binary.LittleEndian.PutUint32(b.data[8:], uint32(b.rLen))
-
-	return b.data
-}
-
-func (b *Batch) decode(prevSeq uint64, data []byte) error {
-	if len(data) < batchHdrLen {
-		return newErrBatchCorrupted("too short")
-	}
-
-	b.seq = binary.LittleEndian.Uint64(data)
-	if b.seq < prevSeq {
-		return newErrBatchCorrupted("invalid sequence number")
-	}
-	b.rLen = int(binary.LittleEndian.Uint32(data[8:]))
-	if b.rLen < 0 {
-		return newErrBatchCorrupted("invalid records length")
-	}
-	// No need to be precise at this point, it won't be used anyway
-	b.bLen = len(data) - batchHdrLen
-	b.data = data
-
-	return nil
-}
-
-func (b *Batch) decodeRec(f func(i int, kt keyType, key, value []byte) error) error {
-	off := batchHdrLen
-	for i := 0; i < b.rLen; i++ {
-		if off >= len(b.data) {
-			return newErrBatchCorrupted("invalid records length")
-		}
-
-		kt := keyType(b.data[off])
-		if kt > keyTypeVal {
-			panic(kt)
-			return newErrBatchCorrupted("bad record: invalid type")
-		}
-		off++
-
-		x, n := binary.Uvarint(b.data[off:])
-		off += n
-		if n <= 0 || off+int(x) > len(b.data) {
-			return newErrBatchCorrupted("bad record: invalid key length")
-		}
-		key := b.data[off : off+int(x)]
-		off += int(x)
-		var value []byte
-		if kt == keyTypeVal {
-			x, n := binary.Uvarint(b.data[off:])
-			off += n
-			if n <= 0 || off+int(x) > len(b.data) {
-				return newErrBatchCorrupted("bad record: invalid value length")
-			}
-			value = b.data[off : off+int(x)]
-			off += int(x)
-		}
-
-		if err := f(i, kt, key, value); err != nil {
+func (b *Batch) replayInternal(fn func(i int, kt keyType, k, v []byte) error) error {
+	for i, index := range b.index {
+		if err := fn(i, index.keyType, index.k(b.data), index.v(b.data)); err != nil {
 			return err
 		}
 	}
-
 	return nil
 }
 
-func (b *Batch) memReplay(to *memdb.DB) error {
-	var ikScratch []byte
-	return b.decodeRec(func(i int, kt keyType, key, value []byte) error {
-		ikScratch = makeInternalKey(ikScratch, key, b.seq+uint64(i), kt)
-		return to.Put(ikScratch, value)
-	})
+func (b *Batch) append(p *Batch) {
+	ob := len(b.data)
+	oi := len(b.index)
+	b.data = append(b.data, p.data...)
+	b.index = append(b.index, p.index...)
+	b.internalLen += p.internalLen
+
+	// Updating index offset.
+	if ob != 0 {
+		for ; oi < len(b.index); oi++ {
+			index := &b.index[oi]
+			index.keyPos += ob
+			if index.valueLen != 0 {
+				index.valuePos += ob
+			}
+		}
+	}
 }
 
-func (b *Batch) memDecodeAndReplay(prevSeq uint64, data []byte, to *memdb.DB) error {
-	if err := b.decode(prevSeq, data); err != nil {
+func (b *Batch) decode(data []byte, expectedLen int) error {
+	b.data = data
+	b.index = b.index[:0]
+	b.internalLen = 0
+	err := decodeBatch(data, func(i int, index batchIndex) error {
+		b.index = append(b.index, index)
+		b.internalLen += index.keyLen + index.valueLen + 8
+		return nil
+	})
+	if err != nil {
 		return err
 	}
-	return b.memReplay(to)
+	if expectedLen >= 0 && len(b.index) != expectedLen {
+		return newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", expectedLen, len(b.index)))
+	}
+	return nil
 }
 
-func (b *Batch) revertMemReplay(to *memdb.DB) error {
-	var ikScratch []byte
-	return b.decodeRec(func(i int, kt keyType, key, value []byte) error {
-		ikScratch := makeInternalKey(ikScratch, key, b.seq+uint64(i), kt)
-		return to.Delete(ikScratch)
+func (b *Batch) putMem(seq uint64, mdb *memdb.DB) error {
+	var ik []byte
+	for i, index := range b.index {
+		ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType)
+		if err := mdb.Put(ik, index.v(b.data)); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (b *Batch) revertMem(seq uint64, mdb *memdb.DB) error {
+	var ik []byte
+	for i, index := range b.index {
+		ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType)
+		if err := mdb.Delete(ik); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func newBatch() interface{} {
+	return &Batch{}
+}
+
+func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error {
+	var index batchIndex
+	for i, o := 0, 0; o < len(data); i++ {
+		// Key type.
+		index.keyType = keyType(data[o])
+		if index.keyType > keyTypeVal {
+			return newErrBatchCorrupted(fmt.Sprintf("bad record: invalid type %#x", uint(index.keyType)))
+		}
+		o++
+
+		// Key.
+		x, n := binary.Uvarint(data[o:])
+		o += n
+		if n <= 0 || o+int(x) > len(data) {
+			return newErrBatchCorrupted("bad record: invalid key length")
+		}
+		index.keyPos = o
+		index.keyLen = int(x)
+		o += index.keyLen
+
+		// Value.
+		if index.keyType == keyTypeVal {
+			x, n = binary.Uvarint(data[o:])
+			o += n
+			if n <= 0 || o+int(x) > len(data) {
+				return newErrBatchCorrupted("bad record: invalid value length")
+			}
+			index.valuePos = o
+			index.valueLen = int(x)
+			o += index.valueLen
+		} else {
+			index.valuePos = 0
+			index.valueLen = 0
+		}
+
+		if err := fn(i, index); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func decodeBatchToMem(data []byte, expectSeq uint64, mdb *memdb.DB) (seq uint64, batchLen int, err error) {
+	seq, batchLen, err = decodeBatchHeader(data)
+	if err != nil {
+		return 0, 0, err
+	}
+	if seq < expectSeq {
+		return 0, 0, newErrBatchCorrupted("invalid sequence number")
+	}
+	data = data[batchHeaderLen:]
+	var ik []byte
+	var decodedLen int
+	err = decodeBatch(data, func(i int, index batchIndex) error {
+		if i >= batchLen {
+			return newErrBatchCorrupted("invalid records length")
+		}
+		ik = makeInternalKey(ik, index.k(data), seq+uint64(i), index.keyType)
+		if err := mdb.Put(ik, index.v(data)); err != nil {
+			return err
+		}
+		decodedLen++
+		return nil
 	})
+	if err == nil && decodedLen != batchLen {
+		err = newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", batchLen, decodedLen))
+	}
+	return
+}
+
+func readBatch(br *bufio.Reader, fn func(i int, kt keyType, k, v []byte) error) error {
+	var k, v []byte
+	for i := 0; ; i++ {
+		// Key type.
+		c, err := br.ReadByte()
+		if err != nil {
+			if err == io.EOF {
+				err = nil
+			}
+			return err
+		}
+		kt := keyType(c)
+		if kt > keyTypeVal {
+			return newErrBatchCorrupted(fmt.Sprintf("bad record: invalid type %#x", uint(kt)))
+		}
+
+		// Key.
+		x, err := binary.ReadUvarint(br)
+		if err == nil {
+			k = ensureBuffer(k, int(x))
+			_, err = io.ReadFull(br, k)
+		}
+		if err != nil {
+			if err == io.ErrUnexpectedEOF {
+				err = newErrBatchCorrupted("bad record: invalid key length")
+			}
+			return err
+		}
+
+		// Value.
+		if kt == keyTypeVal {
+			x, err = binary.ReadUvarint(br)
+			if err == nil {
+				v = ensureBuffer(v, int(x))
+				_, err = io.ReadFull(br, v)
+			}
+			if err != nil {
+				if err == io.ErrUnexpectedEOF {
+					err = newErrBatchCorrupted("bad record: invalid value length")
+				}
+				return err
+			}
+
+			if err := fn(i, kt, k, v); err != nil {
+				return err
+			}
+		} else {
+			if err := fn(i, kt, k, nil); err != nil {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
+func readBatchToMem(br *bufio.Reader, expectSeq uint64, mdb *memdb.DB) (seq uint64, batchLen int, err error) {
+	header, err := br.Peek(batchHeaderLen)
+	if err != nil && err != io.EOF {
+		// Let's decodeBatchHeader returns "too short" error.
+		return 0, 0, err
+	}
+	seq, batchLen, err = decodeBatchHeader(header)
+	if err != nil {
+		return 0, 0, err
+	}
+	if seq < expectSeq {
+		return 0, 0, newErrBatchCorrupted("invalid sequence number")
+	}
+	// Discard is guaranteed to succeed without reading from the underlying
+	// io.Reader.
+	br.Discard(batchHeaderLen)
+	var ik []byte
+	var decodedLen int
+	err = readBatch(br, func(i int, kt keyType, k, v []byte) error {
+		if i >= batchLen {
+			return newErrBatchCorrupted("invalid records length")
+		}
+		ik = makeInternalKey(ik, k, seq+uint64(i), kt)
+		if err := mdb.Put(ik, v); err != nil {
+			return err
+		}
+		decodedLen++
+		return nil
+	})
+	if err == nil && decodedLen != batchLen {
+		err = newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", batchLen, decodedLen))
+	}
+	return
+}
+
+func encodeBatchHeader(dst []byte, seq uint64, batchLen int) []byte {
+	dst = ensureBuffer(dst, batchHeaderLen)
+	binary.LittleEndian.PutUint64(dst, seq)
+	binary.LittleEndian.PutUint32(dst[8:], uint32(batchLen))
+	return dst
+}
+
+func decodeBatchHeader(data []byte) (seq uint64, batchLen int, err error) {
+	if len(data) < batchHeaderLen {
+		return 0, 0, newErrBatchCorrupted("too short")
+	}
+
+	seq = binary.LittleEndian.Uint64(data)
+	batchLen = int(binary.LittleEndian.Uint32(data[8:]))
+	if batchLen < 0 {
+		return 0, 0, newErrBatchCorrupted("invalid records length")
+	}
+	return
+}
+
+func batchesLen(batches []*Batch) int {
+	batchLen := 0
+	for _, batch := range batches {
+		batchLen += batch.Len()
+	}
+	return batchLen
+}
+
+func writeBatchesWithHeader(wr io.Writer, batches []*Batch, seq uint64) error {
+	if _, err := wr.Write(encodeBatchHeader(nil, seq, batchesLen(batches))); err != nil {
+		return err
+	}
+	for _, batch := range batches {
+		if _, err := wr.Write(batch.data); err != nil {
+			return err
+		}
+	}
+	return nil
 }
diff --git a/leveldb/batch_test.go b/leveldb/batch_test.go
index ce0925e..b37ed7c 100644
--- a/leveldb/batch_test.go
+++ b/leveldb/batch_test.go
@@ -7,117 +7,146 @@
 package leveldb
 
 import (
+	"bufio"
 	"bytes"
+	"fmt"
 	"testing"
+	"testing/quick"
 
-	"github.com/syndtr/goleveldb/leveldb/comparer"
-	"github.com/syndtr/goleveldb/leveldb/memdb"
+	"github.com/syndtr/goleveldb/leveldb/testutil"
 )
 
-type tbRec struct {
-	kt         keyType
-	key, value []byte
+func TestBatchHeader(t *testing.T) {
+	f := func(seq uint64, length uint32) bool {
+		encoded := encodeBatchHeader(nil, seq, int(length))
+		decSeq, decLength, err := decodeBatchHeader(encoded)
+		return err == nil && decSeq == seq && decLength == int(length)
+	}
+	config := &quick.Config{
+		Rand: testutil.NewRand(),
+	}
+	if err := quick.Check(f, config); err != nil {
+		t.Error(err)
+	}
 }
 
-type testBatch struct {
-	rec []*tbRec
+type batchKV struct {
+	kt   keyType
+	k, v []byte
 }
 
-func (p *testBatch) Put(key, value []byte) {
-	p.rec = append(p.rec, &tbRec{keyTypeVal, key, value})
-}
-
-func (p *testBatch) Delete(key []byte) {
-	p.rec = append(p.rec, &tbRec{keyTypeDel, key, nil})
-}
-
-func compareBatch(t *testing.T, b1, b2 *Batch) {
-	if b1.seq != b2.seq {
-		t.Errorf("invalid seq number want %d, got %d", b1.seq, b2.seq)
-	}
-	if b1.Len() != b2.Len() {
-		t.Fatalf("invalid record length want %d, got %d", b1.Len(), b2.Len())
-	}
-	p1, p2 := new(testBatch), new(testBatch)
-	err := b1.Replay(p1)
-	if err != nil {
-		t.Fatal("error when replaying batch 1: ", err)
-	}
-	err = b2.Replay(p2)
-	if err != nil {
-		t.Fatal("error when replaying batch 2: ", err)
-	}
-	for i := range p1.rec {
-		r1, r2 := p1.rec[i], p2.rec[i]
-		if r1.kt != r2.kt {
-			t.Errorf("invalid type on record '%d' want %d, got %d", i, r1.kt, r2.kt)
+func TestBatch(t *testing.T) {
+	var (
+		kvs         []batchKV
+		internalLen int
+	)
+	batch := new(Batch)
+	rbatch := new(Batch)
+	abatch := new(Batch)
+	testBatch := func(i int, kt keyType, k, v []byte) error {
+		kv := kvs[i]
+		if kv.kt != kt {
+			return fmt.Errorf("invalid key type, index=%d: %d vs %d", i, kv.kt, kt)
 		}
-		if !bytes.Equal(r1.key, r2.key) {
-			t.Errorf("invalid key on record '%d' want %s, got %s", i, string(r1.key), string(r2.key))
+		if !bytes.Equal(kv.k, k) {
+			return fmt.Errorf("invalid key, index=%d", i)
 		}
-		if r1.kt == keyTypeVal {
-			if !bytes.Equal(r1.value, r2.value) {
-				t.Errorf("invalid value on record '%d' want %s, got %s", i, string(r1.value), string(r2.value))
+		if !bytes.Equal(kv.v, v) {
+			return fmt.Errorf("invalid value, index=%d", i)
+		}
+		return nil
+	}
+	f := func(ktr uint8, k, v []byte) bool {
+		kt := keyType(ktr % 2)
+		if kt == keyTypeVal {
+			batch.Put(k, v)
+			rbatch.Put(k, v)
+			kvs = append(kvs, batchKV{kt: kt, k: k, v: v})
+			internalLen += len(k) + len(v) + 8
+		} else {
+			batch.Delete(k)
+			rbatch.Delete(k)
+			kvs = append(kvs, batchKV{kt: kt, k: k})
+			internalLen += len(k) + 8
+		}
+		if batch.Len() != len(kvs) {
+			t.Logf("batch.Len: %d vs %d", len(kvs), batch.Len())
+			return false
+		}
+		if batch.internalLen != internalLen {
+			t.Logf("abatch.internalLen: %d vs %d", internalLen, batch.internalLen)
+			return false
+		}
+		if len(kvs)%1000 == 0 {
+			if err := batch.replayInternal(testBatch); err != nil {
+				t.Logf("batch.replayInternal: %v", err)
+				return false
+			}
+			if err := readBatch(bufio.NewReader(bytes.NewReader(batch.data)), testBatch); err != nil {
+				t.Logf("readBatch: %v", err)
+				return false
+			}
+
+			abatch.append(rbatch)
+			rbatch.Reset()
+			if abatch.Len() != len(kvs) {
+				t.Logf("abatch.Len: %d vs %d", len(kvs), abatch.Len())
+				return false
+			}
+			if abatch.internalLen != internalLen {
+				t.Logf("abatch.internalLen: %d vs %d", internalLen, abatch.internalLen)
+				return false
+			}
+			if err := abatch.replayInternal(testBatch); err != nil {
+				t.Logf("abatch.replayInternal: %v", err)
+				return false
+			}
+
+			nbatch := new(Batch)
+			if err := nbatch.Load(batch.Dump()); err != nil {
+				t.Logf("nbatch.Load: %v", err)
+				return false
+			}
+			if nbatch.Len() != len(kvs) {
+				t.Logf("nbatch.Len: %d vs %d", len(kvs), nbatch.Len())
+				return false
+			}
+			if nbatch.internalLen != internalLen {
+				t.Logf("nbatch.internalLen: %d vs %d", internalLen, nbatch.internalLen)
+				return false
+			}
+			if err := nbatch.replayInternal(testBatch); err != nil {
+				t.Logf("nbatch.replayInternal: %v", err)
+				return false
 			}
 		}
-	}
-}
-
-func TestBatch_EncodeDecode(t *testing.T) {
-	b1 := new(Batch)
-	b1.seq = 10009
-	b1.Put([]byte("key1"), []byte("value1"))
-	b1.Put([]byte("key2"), []byte("value2"))
-	b1.Delete([]byte("key1"))
-	b1.Put([]byte("k"), []byte(""))
-	b1.Put([]byte("zzzzzzzzzzz"), []byte("zzzzzzzzzzzzzzzzzzzzzzzz"))
-	b1.Delete([]byte("key10000"))
-	b1.Delete([]byte("k"))
-	buf := b1.encode()
-	b2 := new(Batch)
-	err := b2.decode(0, buf)
-	if err != nil {
-		t.Error("error when decoding batch: ", err)
-	}
-	compareBatch(t, b1, b2)
-}
-
-func TestBatch_Append(t *testing.T) {
-	b1 := new(Batch)
-	b1.seq = 10009
-	b1.Put([]byte("key1"), []byte("value1"))
-	b1.Put([]byte("key2"), []byte("value2"))
-	b1.Delete([]byte("key1"))
-	b1.Put([]byte("foo"), []byte("foovalue"))
-	b1.Put([]byte("bar"), []byte("barvalue"))
-	b2a := new(Batch)
-	b2a.seq = 10009
-	b2a.Put([]byte("key1"), []byte("value1"))
-	b2a.Put([]byte("key2"), []byte("value2"))
-	b2a.Delete([]byte("key1"))
-	b2b := new(Batch)
-	b2b.Put([]byte("foo"), []byte("foovalue"))
-	b2b.Put([]byte("bar"), []byte("barvalue"))
-	b2a.append(b2b)
-	compareBatch(t, b1, b2a)
-	if b1.size() != b2a.size() {
-		t.Fatalf("invalid batch size want %d, got %d", b1.size(), b2a.size())
-	}
-}
-
-func TestBatch_Size(t *testing.T) {
-	b := new(Batch)
-	for i := 0; i < 2; i++ {
-		b.Put([]byte("key1"), []byte("value1"))
-		b.Put([]byte("key2"), []byte("value2"))
-		b.Delete([]byte("key1"))
-		b.Put([]byte("foo"), []byte("foovalue"))
-		b.Put([]byte("bar"), []byte("barvalue"))
-		mem := memdb.New(&iComparer{comparer.DefaultComparer}, 0)
-		b.memReplay(mem)
-		if b.size() != mem.Size() {
-			t.Errorf("invalid batch size calculation, want=%d got=%d", mem.Size(), b.size())
+		if len(kvs)%10000 == 0 {
+			nbatch := new(Batch)
+			if err := batch.Replay(nbatch); err != nil {
+				t.Logf("batch.Replay: %v", err)
+				return false
+			}
+			if nbatch.Len() != len(kvs) {
+				t.Logf("nbatch.Len: %d vs %d", len(kvs), nbatch.Len())
+				return false
+			}
+			if nbatch.internalLen != internalLen {
+				t.Logf("nbatch.internalLen: %d vs %d", internalLen, nbatch.internalLen)
+				return false
+			}
+			if err := nbatch.replayInternal(testBatch); err != nil {
+				t.Logf("nbatch.replayInternal: %v", err)
+				return false
+			}
 		}
-		b.Reset()
+		return true
 	}
+	config := &quick.Config{
+		MaxCount: 40000,
+		Rand:     testutil.NewRand(),
+	}
+	if err := quick.Check(f, config); err != nil {
+		t.Error(err)
+	}
+	t.Logf("length=%d internalLen=%d", len(kvs), internalLen)
 }
diff --git a/leveldb/bench_test.go b/leveldb/bench_test.go
index 12a8496..435250c 100644
--- a/leveldb/bench_test.go
+++ b/leveldb/bench_test.go
@@ -104,7 +104,6 @@
 		b.Fatal("cannot open db: ", err)
 	}
 
-	runtime.GOMAXPROCS(runtime.NumCPU())
 	return p
 }
 
@@ -260,7 +259,6 @@
 	p.keys = nil
 	p.values = nil
 	runtime.GC()
-	runtime.GOMAXPROCS(1)
 }
 
 func BenchmarkDBWrite(b *testing.B) {
diff --git a/leveldb/db.go b/leveldb/db.go
index 1aed209..bf1502e 100644
--- a/leveldb/db.go
+++ b/leveldb/db.go
@@ -53,14 +53,13 @@
 	aliveSnaps, aliveIters int32
 
 	// Write.
-	writeC       chan *Batch
+	batchPool    sync.Pool
+	writeMergeC  chan writeMerge
 	writeMergedC chan bool
 	writeLockC   chan struct{}
 	writeAckC    chan error
 	writeDelay   time.Duration
 	writeDelayN  int
-	journalC     chan *Batch
-	journalAckC  chan error
 	tr           *Transaction
 
 	// Compaction.
@@ -94,12 +93,11 @@
 		// Snapshot
 		snapsList: list.New(),
 		// Write
-		writeC:       make(chan *Batch),
+		batchPool:    sync.Pool{New: newBatch},
+		writeMergeC:  make(chan writeMerge),
 		writeMergedC: make(chan bool),
 		writeLockC:   make(chan struct{}, 1),
 		writeAckC:    make(chan error),
-		journalC:     make(chan *Batch),
-		journalAckC:  make(chan error),
 		// Compaction
 		tcompCmdC:   make(chan cCmd),
 		tcompPauseC: make(chan chan<- struct{}),
@@ -144,10 +142,10 @@
 	if readOnly {
 		db.SetReadOnly()
 	} else {
-		db.closeW.Add(3)
+		db.closeW.Add(2)
 		go db.tCompaction()
 		go db.mCompaction()
-		go db.jWriter()
+		// go db.jWriter()
 	}
 
 	s.logf("db@open done T·%v", time.Since(start))
@@ -504,10 +502,11 @@
 			checksum    = db.s.o.GetStrict(opt.StrictJournalChecksum)
 			writeBuffer = db.s.o.GetWriteBuffer()
 
-			jr    *journal.Reader
-			mdb   = memdb.New(db.s.icmp, writeBuffer)
-			buf   = &util.Buffer{}
-			batch = &Batch{}
+			jr       *journal.Reader
+			mdb      = memdb.New(db.s.icmp, writeBuffer)
+			buf      = &util.Buffer{}
+			batchSeq uint64
+			batchLen int
 		)
 
 		for _, fd := range fds {
@@ -569,7 +568,8 @@
 					fr.Close()
 					return errors.SetFd(err, fd)
 				}
-				if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
+				batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
+				if err != nil {
 					if !strict && errors.IsCorrupted(err) {
 						db.s.logf("journal error: %v (skipped)", err)
 						// We won't apply sequence number as it might be corrupted.
@@ -581,7 +581,7 @@
 				}
 
 				// Save sequence number.
-				db.seq = batch.seq + uint64(batch.Len())
+				db.seq = batchSeq + uint64(batchLen)
 
 				// Flush it if large enough.
 				if mdb.Size() >= writeBuffer {
@@ -661,9 +661,10 @@
 		db.logf("journal@recovery RO·Mode F·%d", len(fds))
 
 		var (
-			jr    *journal.Reader
-			buf   = &util.Buffer{}
-			batch = &Batch{}
+			jr       *journal.Reader
+			buf      = &util.Buffer{}
+			batchSeq uint64
+			batchLen int
 		)
 
 		for _, fd := range fds {
@@ -703,7 +704,8 @@
 					fr.Close()
 					return errors.SetFd(err, fd)
 				}
-				if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
+				batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
+				if err != nil {
 					if !strict && errors.IsCorrupted(err) {
 						db.s.logf("journal error: %v (skipped)", err)
 						// We won't apply sequence number as it might be corrupted.
@@ -715,7 +717,7 @@
 				}
 
 				// Save sequence number.
-				db.seq = batch.seq + uint64(batch.Len())
+				db.seq = batchSeq + uint64(batchLen)
 			}
 
 			fr.Close()
diff --git a/leveldb/db_test.go b/leveldb/db_test.go
index 6eaf0d5..602376b 100644
--- a/leveldb/db_test.go
+++ b/leveldb/db_test.go
@@ -1877,7 +1877,7 @@
 }
 
 func TestDB_ConcurrentWrite(t *testing.T) {
-	const n, niter = 10, 10000
+	const n, bk, niter = 10, 3, 10000
 	h := newDbHarness(t)
 	defer h.close()
 
@@ -1889,7 +1889,7 @@
 		go func(i int) {
 			defer wg.Done()
 			for k := 0; k < niter; k++ {
-				kstr := fmt.Sprintf("%d.%d", i, k)
+				kstr := fmt.Sprintf("put-%d.%d", i, k)
 				vstr := fmt.Sprintf("v%d", k)
 				h.put(kstr, vstr)
 				// Key should immediately available after put returns.
@@ -1897,6 +1897,24 @@
 			}
 		}(i)
 	}
+	for i := 0; i < n; i++ {
+		wg.Add(1)
+		batch := &Batch{}
+		go func(i int) {
+			defer wg.Done()
+			for k := 0; k < niter; k++ {
+				batch.Reset()
+				for j := 0; j < bk; j++ {
+					batch.Put([]byte(fmt.Sprintf("batch-%d.%d.%d", i, k, j)), []byte(fmt.Sprintf("v%d", k)))
+				}
+				h.write(batch)
+				// Key should immediately available after put returns.
+				for j := 0; j < bk; j++ {
+					h.getVal(fmt.Sprintf("batch-%d.%d.%d", i, k, j), fmt.Sprintf("v%d", k))
+				}
+			}
+		}(i)
+	}
 	wg.Wait()
 }
 
diff --git a/leveldb/db_transaction.go b/leveldb/db_transaction.go
index fca8803..e3781ff 100644
--- a/leveldb/db_transaction.go
+++ b/leveldb/db_transaction.go
@@ -167,8 +167,8 @@
 	if tr.closed {
 		return errTransactionDone
 	}
-	return b.decodeRec(func(i int, kt keyType, key, value []byte) error {
-		return tr.put(kt, key, value)
+	return b.replayInternal(func(i int, kt keyType, k, v []byte) error {
+		return tr.put(kt, k, v)
 	})
 }
 
@@ -246,8 +246,8 @@
 }
 
 // OpenTransaction opens an atomic DB transaction. Only one transaction can be
-// opened at a time. Write will be blocked until the transaction is committed or
-// discarded.
+// opened at a time. Subsequent call to Write and OpenTransaction will be blocked
+// until in-flight transaction is committed or discarded.
 // The returned transaction handle is goroutine-safe.
 //
 // The transaction must be closed once done, either by committing or discarding
diff --git a/leveldb/db_write.go b/leveldb/db_write.go
index 37acb3f..5fc9eda 100644
--- a/leveldb/db_write.go
+++ b/leveldb/db_write.go
@@ -14,37 +14,23 @@
 	"github.com/syndtr/goleveldb/leveldb/util"
 )
 
-func (db *DB) writeJournal(b *Batch) error {
-	w, err := db.journal.Next()
+func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error {
+	wr, err := db.journal.Next()
 	if err != nil {
 		return err
 	}
-	if _, err := w.Write(b.encode()); err != nil {
+	if err := writeBatchesWithHeader(wr, batches, seq); err != nil {
 		return err
 	}
 	if err := db.journal.Flush(); err != nil {
 		return err
 	}
-	if b.sync {
+	if sync {
 		return db.journalWriter.Sync()
 	}
 	return nil
 }
 
-func (db *DB) jWriter() {
-	defer db.closeW.Done()
-	for {
-		select {
-		case b := <-db.journalC:
-			if b != nil {
-				db.journalAckC <- db.writeJournal(b)
-			}
-		case _, _ = <-db.closeC:
-			return
-		}
-	}
-}
-
 func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
 	// Wait for pending memdb compaction.
 	err = db.compTriggerWait(db.mcompCmdC)
@@ -127,159 +113,250 @@
 	return
 }
 
-// Write apply the given batch to the DB. The batch will be applied
-// sequentially.
-//
-// It is safe to modify the contents of the arguments after Write returns.
-func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) {
-	err = db.ok()
-	if err != nil || b == nil || b.Len() == 0 {
-		return
+type writeMerge struct {
+	sync       bool
+	batch      *Batch
+	keyType    keyType
+	key, value []byte
+}
+
+func (db *DB) unlockWrite(overflow bool, merged int, err error) {
+	for i := 0; i < merged; i++ {
+		db.writeAckC <- err
+	}
+	if overflow {
+		// Pass lock to the next write (that failed to merge).
+		db.writeMergedC <- false
+	} else {
+		// Release lock.
+		<-db.writeLockC
+	}
+}
+
+// ourBatch if defined should equal with batch.
+func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
+	// Try to flush memdb. This method would also trying to throttle writes
+	// if it is too fast and compaction cannot catch-up.
+	mdb, mdbFree, err := db.flush(batch.internalLen)
+	if err != nil {
+		db.unlockWrite(false, 0, err)
+		return err
+	}
+	defer mdb.decref()
+
+	var (
+		overflow bool
+		merged   int
+		batches  = []*Batch{batch}
+	)
+
+	if merge {
+		// Merge limit.
+		var mergeLimit int
+		if batch.internalLen > 128<<10 {
+			mergeLimit = (1 << 20) - batch.internalLen
+		} else {
+			mergeLimit = 128 << 10
+		}
+		mergeCap := mdbFree - batch.internalLen
+		if mergeLimit > mergeCap {
+			mergeLimit = mergeCap
+		}
+
+	merge:
+		for mergeLimit > 0 {
+			select {
+			case incoming := <-db.writeMergeC:
+				if incoming.batch != nil {
+					// Merge batch.
+					if incoming.batch.internalLen > mergeLimit {
+						overflow = true
+						break merge
+					}
+					batches = append(batches, incoming.batch)
+					mergeLimit -= incoming.batch.internalLen
+				} else {
+					// Merge put.
+					internalLen := len(incoming.key) + len(incoming.value) + 8
+					if internalLen > mergeLimit {
+						overflow = true
+						break merge
+					}
+					if ourBatch == nil {
+						ourBatch = db.batchPool.Get().(*Batch)
+						ourBatch.Reset()
+						batches = append(batches, ourBatch)
+					}
+					// We can use same batch since concurrent write doesn't
+					// guarantee write order.
+					ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
+					mergeLimit -= internalLen
+				}
+				sync = sync || incoming.sync
+				merged++
+				db.writeMergedC <- true
+
+			default:
+				break merge
+			}
+		}
 	}
 
-	b.init(wo.GetSync() && !db.s.o.GetNoSync())
+	// Seq number.
+	seq := db.seq + 1
 
-	if b.size() > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
-		// Writes using transaction.
-		tr, err1 := db.OpenTransaction()
-		if err1 != nil {
-			return err1
+	// Write journal.
+	if err := db.writeJournal(batches, seq, sync); err != nil {
+		db.unlockWrite(overflow, merged, err)
+		return err
+	}
+
+	// Put batches.
+	for _, batch := range batches {
+		if err := batch.putMem(seq, mdb.DB); err != nil {
+			panic(err)
 		}
-		if err1 := tr.Write(b, wo); err1 != nil {
+		seq += uint64(batch.Len())
+	}
+
+	// Incr seq number.
+	db.addSeq(uint64(batchesLen(batches)))
+
+	// Rotate memdb if it's reach the threshold.
+	if batch.internalLen >= mdbFree {
+		db.rotateMem(0, false)
+	}
+
+	db.unlockWrite(overflow, merged, nil)
+	return nil
+}
+
+// Write apply the given batch to the DB. The batch records will be applied
+// sequentially. Write might be used concurrently, when used concurrently and
+// batch is small enough, write will try to merge the batches. Set NoWriteMerge
+// option to true to disable write merge.
+//
+// It is safe to modify the contents of the arguments after Write returns but
+// not before. Write will not modify content of the batch.
+func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error {
+	if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 {
+		return err
+	}
+
+	// If the batch size is larger than write buffer, it may justified to write
+	// using transaction instead. Using transaction the batch will be written
+	// into tables directly, skipping the journaling.
+	if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
+		tr, err := db.OpenTransaction()
+		if err != nil {
+			return err
+		}
+		if err := tr.Write(batch, wo); err != nil {
 			tr.Discard()
-			return err1
+			return err
 		}
 		return tr.Commit()
 	}
 
-	// The write happen synchronously.
-	select {
-	case db.writeC <- b:
-		if <-db.writeMergedC {
-			return <-db.writeAckC
-		}
-		// Continue, the write lock already acquired by previous writer
-		// and handed out to us.
-	case db.writeLockC <- struct{}{}:
-	case err = <-db.compPerErrC:
-		return
-	case _, _ = <-db.closeC:
-		return ErrClosed
-	}
+	merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
+	sync := wo.GetSync() && !db.s.o.GetNoSync()
 
-	merged := 0
-	danglingMerge := false
-	defer func() {
-		for i := 0; i < merged; i++ {
-			db.writeAckC <- err
-		}
-		if danglingMerge {
-			// Only one dangling merge at most, so this is safe.
-			db.writeMergedC <- false
-		} else {
-			<-db.writeLockC
-		}
-	}()
-
-	mdb, mdbFree, err := db.flush(b.size())
-	if err != nil {
-		return
-	}
-	defer mdb.decref()
-
-	// Calculate maximum size of the batch.
-	m := 1 << 20
-	if x := b.size(); x <= 128<<10 {
-		m = x + (128 << 10)
-	}
-	m = minInt(m, mdbFree)
-
-	// Merge with other batch.
-drain:
-	for b.size() < m && !b.sync {
+	// Acquire write lock.
+	if merge {
 		select {
-		case nb := <-db.writeC:
-			if b.size()+nb.size() <= m {
-				b.append(nb)
-				db.writeMergedC <- true
-				merged++
-			} else {
-				danglingMerge = true
-				break drain
+		case db.writeMergeC <- writeMerge{sync: sync, batch: batch}:
+			if <-db.writeMergedC {
+				// Write is merged.
+				return <-db.writeAckC
 			}
-		default:
-			break drain
-		}
-	}
-
-	// Set batch first seq number relative from last seq.
-	b.seq = db.seq + 1
-
-	// Write journal concurrently if it is large enough.
-	if b.size() >= (128 << 10) {
-		// Push the write batch to the journal writer
-		select {
-		case db.journalC <- b:
-			// Write into memdb
-			if berr := b.memReplay(mdb.DB); berr != nil {
-				panic(berr)
-			}
-		case err = <-db.compPerErrC:
-			return
-		case _, _ = <-db.closeC:
-			err = ErrClosed
-			return
-		}
-		// Wait for journal writer
-		select {
-		case err = <-db.journalAckC:
-			if err != nil {
-				// Revert memdb if error detected
-				if berr := b.revertMemReplay(mdb.DB); berr != nil {
-					panic(berr)
-				}
-				return
-			}
-		case _, _ = <-db.closeC:
-			err = ErrClosed
-			return
+			// Write is not merged, the write lock is handed to us. Continue.
+		case db.writeLockC <- struct{}{}:
+			// Write lock acquired.
+		case err := <-db.compPerErrC:
+			// Compaction error.
+			return err
+		case <-db.closeC:
+			// Closed
+			return ErrClosed
 		}
 	} else {
-		err = db.writeJournal(b)
-		if err != nil {
-			return
-		}
-		if berr := b.memReplay(mdb.DB); berr != nil {
-			panic(berr)
+		select {
+		case db.writeLockC <- struct{}{}:
+			// Write lock acquired.
+		case err := <-db.compPerErrC:
+			// Compaction error.
+			return err
+		case <-db.closeC:
+			// Closed
+			return ErrClosed
 		}
 	}
 
-	// Set last seq number.
-	db.addSeq(uint64(b.Len()))
+	return db.writeLocked(batch, nil, merge, sync)
+}
 
-	if b.size() >= mdbFree {
-		db.rotateMem(0, false)
+func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
+	if err := db.ok(); err != nil {
+		return err
 	}
-	return
+
+	merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
+	sync := wo.GetSync() && !db.s.o.GetNoSync()
+
+	// Acquire write lock.
+	if merge {
+		select {
+		case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
+			if <-db.writeMergedC {
+				// Write is merged.
+				return <-db.writeAckC
+			}
+			// Write is not merged, the write lock is handed to us. Continue.
+		case db.writeLockC <- struct{}{}:
+			// Write lock acquired.
+		case err := <-db.compPerErrC:
+			// Compaction error.
+			return err
+		case <-db.closeC:
+			// Closed
+			return ErrClosed
+		}
+	} else {
+		select {
+		case db.writeLockC <- struct{}{}:
+			// Write lock acquired.
+		case err := <-db.compPerErrC:
+			// Compaction error.
+			return err
+		case <-db.closeC:
+			// Closed
+			return ErrClosed
+		}
+	}
+
+	batch := db.batchPool.Get().(*Batch)
+	batch.Reset()
+	batch.appendRec(kt, key, value)
+	return db.writeLocked(batch, batch, merge, sync)
 }
 
 // Put sets the value for the given key. It overwrites any previous value
-// for that key; a DB is not a multi-map.
+// for that key; a DB is not a multi-map. Write merge also applies for Put, see
+// Write.
 //
-// It is safe to modify the contents of the arguments after Put returns.
+// It is safe to modify the contents of the arguments after Put returns but not
+// before.
 func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
-	b := new(Batch)
-	b.Put(key, value)
-	return db.Write(b, wo)
+	return db.putRec(keyTypeVal, key, value, wo)
 }
 
-// Delete deletes the value for the given key.
+// Delete deletes the value for the given key. Delete will not returns error if
+// key doesn't exist. Write merge also applies for Delete, see Write.
 //
-// It is safe to modify the contents of the arguments after Delete returns.
+// It is safe to modify the contents of the arguments after Delete returns but
+// not before.
 func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
-	b := new(Batch)
-	b.Delete(key)
-	return db.Write(b, wo)
+	return db.putRec(keyTypeDel, key, nil, wo)
 }
 
 func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
diff --git a/leveldb/key.go b/leveldb/key.go
index d0b80aa..ad8f51e 100644
--- a/leveldb/key.go
+++ b/leveldb/key.go
@@ -37,14 +37,14 @@
 	case keyTypeVal:
 		return "v"
 	}
-	return "x"
+	return fmt.Sprintf("<invalid:%#x>", uint(kt))
 }
 
 // Value types encoded as the last component of internal keys.
 // Don't modify; this value are saved to disk.
 const (
-	keyTypeDel keyType = iota
-	keyTypeVal
+	keyTypeDel = keyType(0)
+	keyTypeVal = keyType(1)
 )
 
 // keyTypeSeek defines the keyType that should be passed when constructing an
@@ -79,11 +79,7 @@
 		panic("leveldb: invalid type")
 	}
 
-	if n := len(ukey) + 8; cap(dst) < n {
-		dst = make([]byte, n)
-	} else {
-		dst = dst[:n]
-	}
+	dst = ensureBuffer(dst, len(ukey)+8)
 	copy(dst, ukey)
 	binary.LittleEndian.PutUint64(dst[len(ukey):], (seq<<8)|uint64(kt))
 	return internalKey(dst)
@@ -143,5 +139,5 @@
 	if ukey, seq, kt, err := parseInternalKey(ik); err == nil {
 		return fmt.Sprintf("%s,%s%d", shorten(string(ukey)), kt, seq)
 	}
-	return "<invalid>"
+	return fmt.Sprintf("<invalid:%#x>", []byte(ik))
 }
diff --git a/leveldb/opt/options.go b/leveldb/opt/options.go
index 3d2bf1c..44e7d9a 100644
--- a/leveldb/opt/options.go
+++ b/leveldb/opt/options.go
@@ -312,6 +312,11 @@
 	// The default is false.
 	NoSync bool
 
+	// NoWriteMerge allows disabling write merge.
+	//
+	// The default is false.
+	NoWriteMerge bool
+
 	// OpenFilesCacher provides cache algorithm for open files caching.
 	// Specify NoCacher to disable caching algorithm.
 	//
@@ -543,6 +548,13 @@
 	return o.NoSync
 }
 
+func (o *Options) GetNoWriteMerge() bool {
+	if o == nil {
+		return false
+	}
+	return o.NoWriteMerge
+}
+
 func (o *Options) GetOpenFilesCacher() Cacher {
 	if o == nil || o.OpenFilesCacher == nil {
 		return DefaultOpenFilesCacher
@@ -629,6 +641,11 @@
 // WriteOptions holds the optional parameters for 'write operation'. The
 // 'write operation' includes Write, Put and Delete.
 type WriteOptions struct {
+	// NoWriteMerge allows disabling write merge.
+	//
+	// The default is false.
+	NoWriteMerge bool
+
 	// Sync is whether to sync underlying writes from the OS buffer cache
 	// through to actual disk, if applicable. Setting Sync can result in
 	// slower writes.
@@ -644,6 +661,13 @@
 	Sync bool
 }
 
+func (wo *WriteOptions) GetNoWriteMerge() bool {
+	if wo == nil {
+		return false
+	}
+	return wo.NoWriteMerge
+}
+
 func (wo *WriteOptions) GetSync() bool {
 	if wo == nil {
 		return false
diff --git a/leveldb/util.go b/leveldb/util.go
index 3b663d1..e572a32 100644
--- a/leveldb/util.go
+++ b/leveldb/util.go
@@ -89,3 +89,10 @@
 func sortFds(fds []storage.FileDesc) {
 	sort.Sort(fdSorter(fds))
 }
+
+func ensureBuffer(b []byte, n int) []byte {
+	if cap(b) < n {
+		return make([]byte, n)
+	}
+	return b[:n]
+}