leveldb: flush memdb only to level-0 (fixes #127)
Pick level other than zero can cause compaction issue with large
bulk insert and delete on strictly incrementing key-space. The
problem is that the small deletion markers trapped at lower level,
while key/value entries keep growing at higher level. Since the
key-space is strictly incrementing it will not overlaps with
higher level, thus maximum possible level is always picked, while
overlapping deletion marker pushed into lower level.
diff --git a/leveldb/db.go b/leveldb/db.go
index 88a3e0d..0458f55 100644
--- a/leveldb/db.go
+++ b/leveldb/db.go
@@ -71,6 +71,7 @@
compErrSetC chan error
compWriteLocking bool
compStats []cStats
+ memdbMaxLevel int // For testing.
// Close.
closeW sync.WaitGroup
@@ -527,7 +528,7 @@
// Flush memdb and remove obsolete journal file.
if of != nil {
if mdb.Len() > 0 {
- if _, err := db.s.flushMemdb(rec, mdb, -1); err != nil {
+ if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
fr.Close()
return err
}
diff --git a/leveldb/db_compaction.go b/leveldb/db_compaction.go
index 8b8dc4a..a774c8a 100644
--- a/leveldb/db_compaction.go
+++ b/leveldb/db_compaction.go
@@ -271,7 +271,7 @@
db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) {
stats.startTimer()
- flushLevel, err = db.s.flushMemdb(rec, mdb.DB, -1)
+ flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel)
stats.stopTimer()
return
}, func() error {
diff --git a/leveldb/db_test.go b/leveldb/db_test.go
index 9d91ebf..ba37422 100644
--- a/leveldb/db_test.go
+++ b/leveldb/db_test.go
@@ -430,7 +430,8 @@
}
return
}
-func (h *dbHarness) tablesPerLevel(want string) {
+
+func (h *dbHarness) getTablesPerLevel() string {
res := ""
nz := 0
v := h.db.s.version()
@@ -444,7 +445,11 @@
}
}
v.release()
- res = res[:nz]
+ return res[:nz]
+}
+
+func (h *dbHarness) tablesPerLevel(want string) {
+ res := h.getTablesPerLevel()
if res != want {
h.t.Errorf("invalid tables len, want=%s, got=%s", want, res)
}
@@ -660,6 +665,8 @@
func TestDB_GetLevel0Ordering(t *testing.T) {
trun(t, func(h *dbHarness) {
+ h.db.memdbMaxLevel = 2
+
for i := 0; i < 4; i++ {
h.put("bar", fmt.Sprintf("b%d", i))
h.put("foo", fmt.Sprintf("v%d", i))
@@ -719,6 +726,8 @@
func TestDB_GetEncountersEmptyLevel(t *testing.T) {
trun(t, func(h *dbHarness) {
+ h.db.memdbMaxLevel = 2
+
// Arrange for the following to happen:
// * sstable A in level 0
// * nothing in level 1
@@ -1192,9 +1201,11 @@
trun(t, func(h *dbHarness) {
s := h.db.s
+ m := 2
+ h.db.memdbMaxLevel = m
+
h.put("foo", "v1")
h.compactMem()
- m := h.o.GetMaxMemCompationLevel()
v := s.version()
num := v.tLen(m)
v.release()
@@ -1236,9 +1247,11 @@
defer h.close()
s := h.db.s
+ m := 2
+ h.db.memdbMaxLevel = m
+
h.put("foo", "v1")
h.compactMem()
- m := h.o.GetMaxMemCompationLevel()
v := s.version()
num := v.tLen(m)
v.release()
@@ -1276,6 +1289,8 @@
h := newDbHarnessWopt(t, &opt.Options{OpenFilesCacheCapacity: -1})
defer h.close()
+ h.db.memdbMaxLevel = 2
+
im := 10
jm := 10
for r := 0; r < 2; r++ {
@@ -1288,7 +1303,7 @@
}
if n := h.totalTables(); n != im*2 {
- t.Errorf("total tables is %d, want %d", n, im)
+ t.Errorf("total tables is %d, want %d", n, im*2)
}
h.stor.SetEmuErr(storage.TypeTable, tsOpOpen)
@@ -1309,9 +1324,7 @@
func TestDB_OverlapInLevel0(t *testing.T) {
trun(t, func(h *dbHarness) {
- if h.o.GetMaxMemCompationLevel() != 2 {
- t.Fatal("fix test to reflect the config")
- }
+ h.db.memdbMaxLevel = 2
// Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0.
h.put("100", "v100")
@@ -1429,7 +1442,7 @@
h.compactMem()
h.getVal("foo", "bar")
v := h.db.s.version()
- if n := v.tLen(h.o.GetMaxMemCompationLevel()); n != 1 {
+ if n := v.tLen(0); n != 1 {
t.Errorf("invalid total tables, want=1 got=%d", n)
}
v.release()
@@ -1441,7 +1454,7 @@
}
// Merging compaction (will fail)
- h.compactRangeAtErr(h.o.GetMaxMemCompationLevel(), "", "", true)
+ h.compactRangeAtErr(0, "", "", true)
h.db.Close()
h.stor.SetEmuErr(0, tsOpWrite)
@@ -1595,9 +1608,7 @@
h := newDbHarness(t)
defer h.close()
- if h.o.GetMaxMemCompationLevel() != 2 {
- t.Fatal("fix test to reflect the config")
- }
+ h.db.memdbMaxLevel = 2
h.putMulti(3, "p", "q")
h.tablesPerLevel("1,1,1")
@@ -2594,6 +2605,8 @@
})
defer h.close()
+ h.db.memdbMaxLevel = 2
+
key := func(x int) string {
return fmt.Sprintf("v%06d", x)
}
@@ -2699,3 +2712,33 @@
ro("bar", "vx", "v2")
h.assertNumKeys(4)
}
+
+func TestDB_BulkInsertDelete(t *testing.T) {
+ h := newDbHarnessWopt(t, &opt.Options{
+ Compression: opt.NoCompression,
+ CompactionTableSize: 128 * opt.KiB,
+ CompactionTotalSize: 1 * opt.MiB,
+ WriteBuffer: 256 * opt.KiB,
+ })
+ defer h.close()
+
+ const R = 100
+ const N = 2500
+ key := make([]byte, 4)
+ value := make([]byte, 256)
+ for i := 0; i < R; i++ {
+ offset := N * i
+ for j := 0; j < N; j++ {
+ binary.BigEndian.PutUint32(key, uint32(offset+j))
+ h.db.Put(key, value, nil)
+ }
+ for j := 0; j < N; j++ {
+ binary.BigEndian.PutUint32(key, uint32(offset+j))
+ h.db.Delete(key, nil)
+ }
+ }
+
+ if tot := h.totalTables(); tot > 10 {
+ t.Fatalf("too many uncompacted tables: %d (%s)", tot, h.getTablesPerLevel())
+ }
+}
diff --git a/leveldb/opt/options.go b/leveldb/opt/options.go
index 7b5d8b9..25313da 100644
--- a/leveldb/opt/options.go
+++ b/leveldb/opt/options.go
@@ -35,7 +35,6 @@
DefaultCompactionTotalSizeMultiplier = 10.0
DefaultCompressionType = SnappyCompression
DefaultIteratorSamplingRate = 1 * MiB
- DefaultMaxMemCompationLevel = 2
DefaultNumLevel = 7
DefaultOpenFilesCacher = LRUCacher
DefaultOpenFilesCacheCapacity = 500
@@ -301,13 +300,6 @@
// The default is 1MiB.
IteratorSamplingRate int
- // MaxMemCompationLevel defines maximum level a newly compacted 'memdb'
- // will be pushed into if doesn't creates overlap. This should less than
- // NumLevel. Use -1 for level-0.
- //
- // The default is 2.
- MaxMemCompationLevel int
-
// NoSync allows completely disable fsync.
//
// The default is false.
@@ -536,21 +528,6 @@
return o.IteratorSamplingRate
}
-func (o *Options) GetMaxMemCompationLevel() int {
- level := DefaultMaxMemCompationLevel
- if o != nil {
- if o.MaxMemCompationLevel > 0 {
- level = o.MaxMemCompationLevel
- } else if o.MaxMemCompationLevel < 0 {
- level = 0
- }
- }
- if level >= o.GetNumLevel() {
- return o.GetNumLevel() - 1
- }
- return level
-}
-
func (o *Options) GetNoSync() bool {
if o == nil {
return false
diff --git a/leveldb/session_compaction.go b/leveldb/session_compaction.go
index 36aa1d8..b99e86d 100644
--- a/leveldb/session_compaction.go
+++ b/leveldb/session_compaction.go
@@ -14,25 +14,30 @@
"github.com/syndtr/goleveldb/leveldb/opt"
)
-func (s *session) pickMemdbLevel(umin, umax []byte) int {
+func (s *session) pickMemdbLevel(umin, umax []byte, maxLevel int) int {
v := s.version()
defer v.release()
- return v.pickMemdbLevel(umin, umax)
+ return v.pickMemdbLevel(umin, umax, maxLevel)
}
-func (s *session) flushMemdb(rec *sessionRecord, mdb *memdb.DB, level int) (level_ int, err error) {
+func (s *session) flushMemdb(rec *sessionRecord, mdb *memdb.DB, maxLevel int) (level int, err error) {
// Create sorted table.
iter := mdb.NewIterator(nil)
defer iter.Release()
t, n, err := s.tops.createFrom(iter)
if err != nil {
- return level, err
+ return
}
- // Pick level and add to record.
- if level < 0 {
- level = s.pickMemdbLevel(t.imin.ukey(), t.imax.ukey())
- }
+ // Pick level other than zero can cause compaction issue with large
+ // bulk insert and delete on strictly incrementing key-space. The
+ // problem is that the small deletion markers trapped at lower level,
+ // while key/value entries keep growing at higher level. Since the
+ // key-space is strictly incrementing it will not overlaps with
+ // higher level, thus maximum possible level is always picked, while
+ // overlapping deletion marker pushed into lower level.
+ // See: https://github.com/syndtr/goleveldb/issues/127.
+ level = s.pickMemdbLevel(t.imin.ukey(), t.imax.ukey(), maxLevel)
rec.addTableFile(level, t)
s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", level, t.file.Num(), n, shortenb(int(t.size)), t.imin, t.imax)
@@ -210,7 +215,8 @@
}
func (c *compaction) baseLevelForKey(ukey []byte) bool {
- for level, tables := range c.v.tables[c.level+2:] {
+ for level := c.level + 2; level < len(c.v.tables); level++ {
+ tables := c.v.tables[level]
for c.tPtrs[level] < len(tables) {
t := tables[c.tPtrs[level]]
if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 {
diff --git a/leveldb/version.go b/leveldb/version.go
index 7227de2..d6497dd 100644
--- a/leveldb/version.go
+++ b/leveldb/version.go
@@ -300,21 +300,21 @@
return
}
-func (v *version) pickMemdbLevel(umin, umax []byte) (level int) {
- if !v.tables[0].overlaps(v.s.icmp, umin, umax, true) {
- var overlaps tFiles
- maxLevel := v.s.o.GetMaxMemCompationLevel()
- for ; level < maxLevel; level++ {
- if v.tables[level+1].overlaps(v.s.icmp, umin, umax, false) {
- break
- }
- overlaps = v.tables[level+2].getOverlaps(overlaps, v.s.icmp, umin, umax, false)
- if overlaps.size() > uint64(v.s.o.GetCompactionGPOverlaps(level)) {
- break
+func (v *version) pickMemdbLevel(umin, umax []byte, maxLevel int) (level int) {
+ if maxLevel > 0 {
+ if !v.tables[0].overlaps(v.s.icmp, umin, umax, true) {
+ var overlaps tFiles
+ for ; level < maxLevel; level++ {
+ if v.tables[level+1].overlaps(v.s.icmp, umin, umax, false) {
+ break
+ }
+ overlaps = v.tables[level+2].getOverlaps(overlaps, v.s.icmp, umin, umax, false)
+ if overlaps.size() > uint64(v.s.o.GetCompactionGPOverlaps(level)) {
+ break
+ }
}
}
}
-
return
}