leveldb: switch to transaction mode when writing large batch #129
diff --git a/leveldb/db_test.go b/leveldb/db_test.go
index 2965b7f..e37cbc9 100644
--- a/leveldb/db_test.go
+++ b/leveldb/db_test.go
@@ -74,7 +74,7 @@
 }
 
 func newDbHarness(t *testing.T) *dbHarness {
-	return newDbHarnessWopt(t, &opt.Options{})
+	return newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true})
 }
 
 func (h *dbHarness) init(t *testing.T, o *opt.Options) {
@@ -514,7 +514,7 @@
 			case 0:
 			case 1:
 				if o == nil {
-					o = &opt.Options{Filter: _bloom_filter}
+					o = &opt.Options{DisableLargeBatchTransaction: true, Filter: _bloom_filter}
 				} else {
 					old := o
 					o = &opt.Options{}
@@ -523,7 +523,7 @@
 				}
 			case 2:
 				if o == nil {
-					o = &opt.Options{Compression: opt.NoCompression}
+					o = &opt.Options{DisableLargeBatchTransaction: true, Compression: opt.NoCompression}
 				} else {
 					old := o
 					o = &opt.Options{}
@@ -621,7 +621,10 @@
 }
 
 func TestDB_GetFromFrozen(t *testing.T) {
-	h := newDbHarnessWopt(t, &opt.Options{WriteBuffer: 100100})
+	h := newDbHarnessWopt(t, &opt.Options{
+		DisableLargeBatchTransaction: true,
+		WriteBuffer:                  100100,
+	})
 	defer h.close()
 
 	h.put("foo", "v1")
@@ -893,7 +896,7 @@
 }
 
 func TestDB_RecoverDuringMemtableCompaction(t *testing.T) {
-	truno(t, &opt.Options{WriteBuffer: 1000000}, func(h *dbHarness) {
+	truno(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 1000000}, func(h *dbHarness) {
 
 		h.stor.Stall(testutil.ModeSync, storage.TypeTable)
 		h.put("big1", strings.Repeat("x", 10000000))
@@ -909,7 +912,7 @@
 }
 
 func TestDB_MinorCompactionsHappen(t *testing.T) {
-	h := newDbHarnessWopt(t, &opt.Options{WriteBuffer: 10000})
+	h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 10000})
 	defer h.close()
 
 	n := 500
@@ -959,8 +962,9 @@
 
 func TestDB_CompactionsGenerateMultipleFiles(t *testing.T) {
 	h := newDbHarnessWopt(t, &opt.Options{
-		WriteBuffer: 10000000,
-		Compression: opt.NoCompression,
+		DisableLargeBatchTransaction: true,
+		WriteBuffer:                  10000000,
+		Compression:                  opt.NoCompression,
 	})
 	defer h.close()
 
@@ -996,7 +1000,7 @@
 }
 
 func TestDB_RepeatedWritesToSameKey(t *testing.T) {
-	h := newDbHarnessWopt(t, &opt.Options{WriteBuffer: 100000})
+	h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 100000})
 	defer h.close()
 
 	maxTables := h.o.GetWriteL0PauseTrigger() + 7
@@ -1012,7 +1016,10 @@
 }
 
 func TestDB_RepeatedWritesToSameKeyAfterReopen(t *testing.T) {
-	h := newDbHarnessWopt(t, &opt.Options{WriteBuffer: 100000})
+	h := newDbHarnessWopt(t, &opt.Options{
+		DisableLargeBatchTransaction: true,
+		WriteBuffer:                  100000,
+	})
 	defer h.close()
 
 	h.reopenDB()
@@ -1030,7 +1037,7 @@
 }
 
 func TestDB_SparseMerge(t *testing.T) {
-	h := newDbHarnessWopt(t, &opt.Options{Compression: opt.NoCompression})
+	h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, Compression: opt.NoCompression})
 	defer h.close()
 
 	h.putMulti(7, "A", "Z")
@@ -1069,8 +1076,9 @@
 
 func TestDB_SizeOf(t *testing.T) {
 	h := newDbHarnessWopt(t, &opt.Options{
-		Compression: opt.NoCompression,
-		WriteBuffer: 10000000,
+		DisableLargeBatchTransaction: true,
+		Compression:                  opt.NoCompression,
+		WriteBuffer:                  10000000,
 	})
 	defer h.close()
 
@@ -1118,7 +1126,10 @@
 }
 
 func TestDB_SizeOf_MixOfSmallAndLarge(t *testing.T) {
-	h := newDbHarnessWopt(t, &opt.Options{Compression: opt.NoCompression})
+	h := newDbHarnessWopt(t, &opt.Options{
+		DisableLargeBatchTransaction: true,
+		Compression:                  opt.NoCompression,
+	})
 	defer h.close()
 
 	sizes := []uint64{
@@ -1311,7 +1322,10 @@
 }
 
 func TestDB_CompactionTableOpenError(t *testing.T) {
-	h := newDbHarnessWopt(t, &opt.Options{OpenFilesCacheCapacity: -1})
+	h := newDbHarnessWopt(t, &opt.Options{
+		DisableLargeBatchTransaction: true,
+		OpenFilesCacheCapacity:       -1,
+	})
 	defer h.close()
 
 	h.db.memdbMaxLevel = 2
@@ -1601,8 +1615,9 @@
 
 func TestDB_CustomComparer(t *testing.T) {
 	h := newDbHarnessWopt(t, &opt.Options{
-		Comparer:    numberComparer{},
-		WriteBuffer: 1000,
+		DisableLargeBatchTransaction: true,
+		Comparer:                     numberComparer{},
+		WriteBuffer:                  1000,
 	})
 	defer h.close()
 
@@ -1667,8 +1682,9 @@
 
 func TestDB_BloomFilter(t *testing.T) {
 	h := newDbHarnessWopt(t, &opt.Options{
-		DisableBlockCache: true,
-		Filter:            filter.NewBloomFilter(10),
+		DisableLargeBatchTransaction: true,
+		DisableBlockCache:            true,
+		Filter:                       filter.NewBloomFilter(10),
 	})
 	defer h.close()
 
@@ -1786,7 +1802,7 @@
 	const n, n2 = 4, 4000
 
 	runtime.GOMAXPROCS(n*2 + 2)
-	truno(t, &opt.Options{WriteBuffer: 30}, func(h *dbHarness) {
+	truno(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 30}, func(h *dbHarness) {
 		var closeWg sync.WaitGroup
 		var stop uint32
 
@@ -1924,7 +1940,10 @@
 
 	// Disable compression since it affects the creation of layers and the
 	// code below is trying to test against a very specific scenario.
-	h := newDbHarnessWopt(t, &opt.Options{Compression: opt.NoCompression})
+	h := newDbHarnessWopt(t, &opt.Options{
+		DisableLargeBatchTransaction: true,
+		Compression:                  opt.NoCompression,
+	})
 	defer h.close()
 
 	// Create first key range.
@@ -1985,7 +2004,8 @@
 
 func TestDB_GoleveldbIssue74(t *testing.T) {
 	h := newDbHarnessWopt(t, &opt.Options{
-		WriteBuffer: 1 * opt.MiB,
+		DisableLargeBatchTransaction: true,
+		WriteBuffer:                  1 * opt.MiB,
 	})
 	defer h.close()
 
@@ -2103,8 +2123,9 @@
 
 func TestDB_GoleveldbIssue72and83(t *testing.T) {
 	h := newDbHarnessWopt(t, &opt.Options{
-		WriteBuffer:            1 * opt.MiB,
-		OpenFilesCacheCapacity: 3,
+		DisableLargeBatchTransaction: true,
+		WriteBuffer:                  1 * opt.MiB,
+		OpenFilesCacheCapacity:       3,
 	})
 	defer h.close()
 
@@ -2236,9 +2257,10 @@
 
 func TestDB_TransientError(t *testing.T) {
 	h := newDbHarnessWopt(t, &opt.Options{
-		WriteBuffer:              128 * opt.KiB,
-		OpenFilesCacheCapacity:   3,
-		DisableCompactionBackoff: true,
+		DisableLargeBatchTransaction: true,
+		WriteBuffer:                  128 * opt.KiB,
+		OpenFilesCacheCapacity:       3,
+		DisableCompactionBackoff:     true,
 	})
 	defer h.close()
 
@@ -2349,9 +2371,10 @@
 
 func TestDB_UkeyShouldntHopAcrossTable(t *testing.T) {
 	h := newDbHarnessWopt(t, &opt.Options{
-		WriteBuffer:                 112 * opt.KiB,
-		CompactionTableSize:         90 * opt.KiB,
-		CompactionExpandLimitFactor: 1,
+		DisableLargeBatchTransaction: true,
+		WriteBuffer:                  112 * opt.KiB,
+		CompactionTableSize:          90 * opt.KiB,
+		CompactionExpandLimitFactor:  1,
 	})
 	defer h.close()
 
@@ -2446,11 +2469,12 @@
 	const nSeq = 99
 
 	o := &opt.Options{
-		WriteBuffer:                 112 * opt.KiB,
-		CompactionTableSize:         43 * opt.KiB,
-		CompactionExpandLimitFactor: 1,
-		CompactionGPOverlapsFactor:  1,
-		DisableBlockCache:           true,
+		DisableLargeBatchTransaction: true,
+		WriteBuffer:                  112 * opt.KiB,
+		CompactionTableSize:          43 * opt.KiB,
+		CompactionExpandLimitFactor:  1,
+		CompactionGPOverlapsFactor:   1,
+		DisableBlockCache:            true,
 	}
 	s, err := newSession(stor, o)
 	if err != nil {
@@ -2626,8 +2650,9 @@
 	)
 
 	h := newDbHarnessWopt(t, &opt.Options{
-		Compression:       opt.NoCompression,
-		DisableBlockCache: true,
+		DisableLargeBatchTransaction: true,
+		Compression:                  opt.NoCompression,
+		DisableBlockCache:            true,
 	})
 	defer h.close()
 
@@ -2742,10 +2767,11 @@
 
 func TestDB_BulkInsertDelete(t *testing.T) {
 	h := newDbHarnessWopt(t, &opt.Options{
-		Compression:         opt.NoCompression,
-		CompactionTableSize: 128 * opt.KiB,
-		CompactionTotalSize: 1 * opt.MiB,
-		WriteBuffer:         256 * opt.KiB,
+		DisableLargeBatchTransaction: true,
+		Compression:                  opt.NoCompression,
+		CompactionTableSize:          128 * opt.KiB,
+		CompactionTotalSize:          1 * opt.MiB,
+		WriteBuffer:                  256 * opt.KiB,
 	})
 	defer h.close()
 
diff --git a/leveldb/db_write.go b/leveldb/db_write.go
index 293e265..5200be6 100644
--- a/leveldb/db_write.go
+++ b/leveldb/db_write.go
@@ -135,6 +135,19 @@
 
 	b.init(wo.GetSync() && !db.s.o.GetNoSync())
 
+	if b.size() > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
+		// Writes using transaction.
+		tr, err1 := db.OpenTransaction()
+		if err1 != nil {
+			return err1
+		}
+		if err1 := tr.Write(b, wo); err1 != nil {
+			tr.Discard()
+			return err1
+		}
+		return tr.Commit()
+	}
+
 	// The write happen synchronously.
 	select {
 	case db.writeC <- b:
diff --git a/leveldb/opt/options.go b/leveldb/opt/options.go
index 190a5c0..3d2bf1c 100644
--- a/leveldb/opt/options.go
+++ b/leveldb/opt/options.go
@@ -8,10 +8,11 @@
 package opt
 
 import (
+	"math"
+
 	"github.com/syndtr/goleveldb/leveldb/cache"
 	"github.com/syndtr/goleveldb/leveldb/comparer"
 	"github.com/syndtr/goleveldb/leveldb/filter"
-	"math"
 )
 
 const (
@@ -264,6 +265,13 @@
 	// The default value is false.
 	DisableCompactionBackoff bool
 
+	// DisableLargeBatchTransaction allows disabling switch-to-transaction mode
+	// on large batch write. If enable batch writes large than WriteBuffer will
+	// use transaction.
+	//
+	// The default is false.
+	DisableLargeBatchTransaction bool
+
 	// ErrorIfExist defines whether an error should returned if the DB already
 	// exist.
 	//
@@ -493,6 +501,13 @@
 	return o.DisableCompactionBackoff
 }
 
+func (o *Options) GetDisableLargeBatchTransaction() bool {
+	if o == nil {
+		return false
+	}
+	return o.DisableLargeBatchTransaction
+}
+
 func (o *Options) GetErrorIfExist() bool {
 	if o == nil {
 		return false
diff --git a/manualtest/dbstress/main.go b/manualtest/dbstress/main.go
index c49900c..7fe639d 100644
--- a/manualtest/dbstress/main.go
+++ b/manualtest/dbstress/main.go
@@ -163,7 +163,10 @@
 		log.Fatal(err)
 	}
 
-	o := &opt.Options{Strict: opt.NoStrict}
+	o := &opt.Options{
+		DisableLargeBatchTransaction: true,
+		Strict: opt.NoStrict,
+	}
 	if checksum {
 		o.Strict = opt.StrictBlockChecksum | opt.StrictReader
 	}