leveldb: switch to transaction mode when writing large batch #129
diff --git a/leveldb/db_test.go b/leveldb/db_test.go
index 2965b7f..e37cbc9 100644
--- a/leveldb/db_test.go
+++ b/leveldb/db_test.go
@@ -74,7 +74,7 @@
}
func newDbHarness(t *testing.T) *dbHarness {
- return newDbHarnessWopt(t, &opt.Options{})
+ return newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true})
}
func (h *dbHarness) init(t *testing.T, o *opt.Options) {
@@ -514,7 +514,7 @@
case 0:
case 1:
if o == nil {
- o = &opt.Options{Filter: _bloom_filter}
+ o = &opt.Options{DisableLargeBatchTransaction: true, Filter: _bloom_filter}
} else {
old := o
o = &opt.Options{}
@@ -523,7 +523,7 @@
}
case 2:
if o == nil {
- o = &opt.Options{Compression: opt.NoCompression}
+ o = &opt.Options{DisableLargeBatchTransaction: true, Compression: opt.NoCompression}
} else {
old := o
o = &opt.Options{}
@@ -621,7 +621,10 @@
}
func TestDB_GetFromFrozen(t *testing.T) {
- h := newDbHarnessWopt(t, &opt.Options{WriteBuffer: 100100})
+ h := newDbHarnessWopt(t, &opt.Options{
+ DisableLargeBatchTransaction: true,
+ WriteBuffer: 100100,
+ })
defer h.close()
h.put("foo", "v1")
@@ -893,7 +896,7 @@
}
func TestDB_RecoverDuringMemtableCompaction(t *testing.T) {
- truno(t, &opt.Options{WriteBuffer: 1000000}, func(h *dbHarness) {
+ truno(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 1000000}, func(h *dbHarness) {
h.stor.Stall(testutil.ModeSync, storage.TypeTable)
h.put("big1", strings.Repeat("x", 10000000))
@@ -909,7 +912,7 @@
}
func TestDB_MinorCompactionsHappen(t *testing.T) {
- h := newDbHarnessWopt(t, &opt.Options{WriteBuffer: 10000})
+ h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 10000})
defer h.close()
n := 500
@@ -959,8 +962,9 @@
func TestDB_CompactionsGenerateMultipleFiles(t *testing.T) {
h := newDbHarnessWopt(t, &opt.Options{
- WriteBuffer: 10000000,
- Compression: opt.NoCompression,
+ DisableLargeBatchTransaction: true,
+ WriteBuffer: 10000000,
+ Compression: opt.NoCompression,
})
defer h.close()
@@ -996,7 +1000,7 @@
}
func TestDB_RepeatedWritesToSameKey(t *testing.T) {
- h := newDbHarnessWopt(t, &opt.Options{WriteBuffer: 100000})
+ h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 100000})
defer h.close()
maxTables := h.o.GetWriteL0PauseTrigger() + 7
@@ -1012,7 +1016,10 @@
}
func TestDB_RepeatedWritesToSameKeyAfterReopen(t *testing.T) {
- h := newDbHarnessWopt(t, &opt.Options{WriteBuffer: 100000})
+ h := newDbHarnessWopt(t, &opt.Options{
+ DisableLargeBatchTransaction: true,
+ WriteBuffer: 100000,
+ })
defer h.close()
h.reopenDB()
@@ -1030,7 +1037,7 @@
}
func TestDB_SparseMerge(t *testing.T) {
- h := newDbHarnessWopt(t, &opt.Options{Compression: opt.NoCompression})
+ h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, Compression: opt.NoCompression})
defer h.close()
h.putMulti(7, "A", "Z")
@@ -1069,8 +1076,9 @@
func TestDB_SizeOf(t *testing.T) {
h := newDbHarnessWopt(t, &opt.Options{
- Compression: opt.NoCompression,
- WriteBuffer: 10000000,
+ DisableLargeBatchTransaction: true,
+ Compression: opt.NoCompression,
+ WriteBuffer: 10000000,
})
defer h.close()
@@ -1118,7 +1126,10 @@
}
func TestDB_SizeOf_MixOfSmallAndLarge(t *testing.T) {
- h := newDbHarnessWopt(t, &opt.Options{Compression: opt.NoCompression})
+ h := newDbHarnessWopt(t, &opt.Options{
+ DisableLargeBatchTransaction: true,
+ Compression: opt.NoCompression,
+ })
defer h.close()
sizes := []uint64{
@@ -1311,7 +1322,10 @@
}
func TestDB_CompactionTableOpenError(t *testing.T) {
- h := newDbHarnessWopt(t, &opt.Options{OpenFilesCacheCapacity: -1})
+ h := newDbHarnessWopt(t, &opt.Options{
+ DisableLargeBatchTransaction: true,
+ OpenFilesCacheCapacity: -1,
+ })
defer h.close()
h.db.memdbMaxLevel = 2
@@ -1601,8 +1615,9 @@
func TestDB_CustomComparer(t *testing.T) {
h := newDbHarnessWopt(t, &opt.Options{
- Comparer: numberComparer{},
- WriteBuffer: 1000,
+ DisableLargeBatchTransaction: true,
+ Comparer: numberComparer{},
+ WriteBuffer: 1000,
})
defer h.close()
@@ -1667,8 +1682,9 @@
func TestDB_BloomFilter(t *testing.T) {
h := newDbHarnessWopt(t, &opt.Options{
- DisableBlockCache: true,
- Filter: filter.NewBloomFilter(10),
+ DisableLargeBatchTransaction: true,
+ DisableBlockCache: true,
+ Filter: filter.NewBloomFilter(10),
})
defer h.close()
@@ -1786,7 +1802,7 @@
const n, n2 = 4, 4000
runtime.GOMAXPROCS(n*2 + 2)
- truno(t, &opt.Options{WriteBuffer: 30}, func(h *dbHarness) {
+ truno(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 30}, func(h *dbHarness) {
var closeWg sync.WaitGroup
var stop uint32
@@ -1924,7 +1940,10 @@
// Disable compression since it affects the creation of layers and the
// code below is trying to test against a very specific scenario.
- h := newDbHarnessWopt(t, &opt.Options{Compression: opt.NoCompression})
+ h := newDbHarnessWopt(t, &opt.Options{
+ DisableLargeBatchTransaction: true,
+ Compression: opt.NoCompression,
+ })
defer h.close()
// Create first key range.
@@ -1985,7 +2004,8 @@
func TestDB_GoleveldbIssue74(t *testing.T) {
h := newDbHarnessWopt(t, &opt.Options{
- WriteBuffer: 1 * opt.MiB,
+ DisableLargeBatchTransaction: true,
+ WriteBuffer: 1 * opt.MiB,
})
defer h.close()
@@ -2103,8 +2123,9 @@
func TestDB_GoleveldbIssue72and83(t *testing.T) {
h := newDbHarnessWopt(t, &opt.Options{
- WriteBuffer: 1 * opt.MiB,
- OpenFilesCacheCapacity: 3,
+ DisableLargeBatchTransaction: true,
+ WriteBuffer: 1 * opt.MiB,
+ OpenFilesCacheCapacity: 3,
})
defer h.close()
@@ -2236,9 +2257,10 @@
func TestDB_TransientError(t *testing.T) {
h := newDbHarnessWopt(t, &opt.Options{
- WriteBuffer: 128 * opt.KiB,
- OpenFilesCacheCapacity: 3,
- DisableCompactionBackoff: true,
+ DisableLargeBatchTransaction: true,
+ WriteBuffer: 128 * opt.KiB,
+ OpenFilesCacheCapacity: 3,
+ DisableCompactionBackoff: true,
})
defer h.close()
@@ -2349,9 +2371,10 @@
func TestDB_UkeyShouldntHopAcrossTable(t *testing.T) {
h := newDbHarnessWopt(t, &opt.Options{
- WriteBuffer: 112 * opt.KiB,
- CompactionTableSize: 90 * opt.KiB,
- CompactionExpandLimitFactor: 1,
+ DisableLargeBatchTransaction: true,
+ WriteBuffer: 112 * opt.KiB,
+ CompactionTableSize: 90 * opt.KiB,
+ CompactionExpandLimitFactor: 1,
})
defer h.close()
@@ -2446,11 +2469,12 @@
const nSeq = 99
o := &opt.Options{
- WriteBuffer: 112 * opt.KiB,
- CompactionTableSize: 43 * opt.KiB,
- CompactionExpandLimitFactor: 1,
- CompactionGPOverlapsFactor: 1,
- DisableBlockCache: true,
+ DisableLargeBatchTransaction: true,
+ WriteBuffer: 112 * opt.KiB,
+ CompactionTableSize: 43 * opt.KiB,
+ CompactionExpandLimitFactor: 1,
+ CompactionGPOverlapsFactor: 1,
+ DisableBlockCache: true,
}
s, err := newSession(stor, o)
if err != nil {
@@ -2626,8 +2650,9 @@
)
h := newDbHarnessWopt(t, &opt.Options{
- Compression: opt.NoCompression,
- DisableBlockCache: true,
+ DisableLargeBatchTransaction: true,
+ Compression: opt.NoCompression,
+ DisableBlockCache: true,
})
defer h.close()
@@ -2742,10 +2767,11 @@
func TestDB_BulkInsertDelete(t *testing.T) {
h := newDbHarnessWopt(t, &opt.Options{
- Compression: opt.NoCompression,
- CompactionTableSize: 128 * opt.KiB,
- CompactionTotalSize: 1 * opt.MiB,
- WriteBuffer: 256 * opt.KiB,
+ DisableLargeBatchTransaction: true,
+ Compression: opt.NoCompression,
+ CompactionTableSize: 128 * opt.KiB,
+ CompactionTotalSize: 1 * opt.MiB,
+ WriteBuffer: 256 * opt.KiB,
})
defer h.close()
diff --git a/leveldb/db_write.go b/leveldb/db_write.go
index 293e265..5200be6 100644
--- a/leveldb/db_write.go
+++ b/leveldb/db_write.go
@@ -135,6 +135,19 @@
b.init(wo.GetSync() && !db.s.o.GetNoSync())
+ if b.size() > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
+ // Writes using transaction.
+ tr, err1 := db.OpenTransaction()
+ if err1 != nil {
+ return err1
+ }
+ if err1 := tr.Write(b, wo); err1 != nil {
+ tr.Discard()
+ return err1
+ }
+ return tr.Commit()
+ }
+
// The write happen synchronously.
select {
case db.writeC <- b:
diff --git a/leveldb/opt/options.go b/leveldb/opt/options.go
index 190a5c0..3d2bf1c 100644
--- a/leveldb/opt/options.go
+++ b/leveldb/opt/options.go
@@ -8,10 +8,11 @@
package opt
import (
+ "math"
+
"github.com/syndtr/goleveldb/leveldb/cache"
"github.com/syndtr/goleveldb/leveldb/comparer"
"github.com/syndtr/goleveldb/leveldb/filter"
- "math"
)
const (
@@ -264,6 +265,13 @@
// The default value is false.
DisableCompactionBackoff bool
+ // DisableLargeBatchTransaction allows disabling switch-to-transaction mode
+ // on large batch write. If enable batch writes large than WriteBuffer will
+ // use transaction.
+ //
+ // The default is false.
+ DisableLargeBatchTransaction bool
+
// ErrorIfExist defines whether an error should returned if the DB already
// exist.
//
@@ -493,6 +501,13 @@
return o.DisableCompactionBackoff
}
+func (o *Options) GetDisableLargeBatchTransaction() bool {
+ if o == nil {
+ return false
+ }
+ return o.DisableLargeBatchTransaction
+}
+
func (o *Options) GetErrorIfExist() bool {
if o == nil {
return false
diff --git a/manualtest/dbstress/main.go b/manualtest/dbstress/main.go
index c49900c..7fe639d 100644
--- a/manualtest/dbstress/main.go
+++ b/manualtest/dbstress/main.go
@@ -163,7 +163,10 @@
log.Fatal(err)
}
- o := &opt.Options{Strict: opt.NoStrict}
+ o := &opt.Options{
+ DisableLargeBatchTransaction: true,
+ Strict: opt.NoStrict,
+ }
if checksum {
o.Strict = opt.StrictBlockChecksum | opt.StrictReader
}