leveldb: flush memdb only to level-0 (fixes #127)

Pick level other than zero can cause compaction issue with large
bulk insert and delete on strictly incrementing key-space. The
problem is that the small deletion markers trapped at lower level,
while key/value entries keep growing at higher level. Since the
key-space is strictly incrementing it will not overlaps with
higher level, thus maximum possible level is always picked, while
overlapping deletion marker pushed into lower level.
diff --git a/leveldb/db.go b/leveldb/db.go
index 88a3e0d..0458f55 100644
--- a/leveldb/db.go
+++ b/leveldb/db.go
@@ -71,6 +71,7 @@
 	compErrSetC      chan error
 	compWriteLocking bool
 	compStats        []cStats
+	memdbMaxLevel    int // For testing.
 
 	// Close.
 	closeW sync.WaitGroup
@@ -527,7 +528,7 @@
 			// Flush memdb and remove obsolete journal file.
 			if of != nil {
 				if mdb.Len() > 0 {
-					if _, err := db.s.flushMemdb(rec, mdb, -1); err != nil {
+					if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
 						fr.Close()
 						return err
 					}
diff --git a/leveldb/db_compaction.go b/leveldb/db_compaction.go
index 8b8dc4a..a774c8a 100644
--- a/leveldb/db_compaction.go
+++ b/leveldb/db_compaction.go
@@ -271,7 +271,7 @@
 
 	db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) {
 		stats.startTimer()
-		flushLevel, err = db.s.flushMemdb(rec, mdb.DB, -1)
+		flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel)
 		stats.stopTimer()
 		return
 	}, func() error {
diff --git a/leveldb/db_test.go b/leveldb/db_test.go
index 9d91ebf..ba37422 100644
--- a/leveldb/db_test.go
+++ b/leveldb/db_test.go
@@ -430,7 +430,8 @@
 	}
 	return
 }
-func (h *dbHarness) tablesPerLevel(want string) {
+
+func (h *dbHarness) getTablesPerLevel() string {
 	res := ""
 	nz := 0
 	v := h.db.s.version()
@@ -444,7 +445,11 @@
 		}
 	}
 	v.release()
-	res = res[:nz]
+	return res[:nz]
+}
+
+func (h *dbHarness) tablesPerLevel(want string) {
+	res := h.getTablesPerLevel()
 	if res != want {
 		h.t.Errorf("invalid tables len, want=%s, got=%s", want, res)
 	}
@@ -660,6 +665,8 @@
 
 func TestDB_GetLevel0Ordering(t *testing.T) {
 	trun(t, func(h *dbHarness) {
+		h.db.memdbMaxLevel = 2
+
 		for i := 0; i < 4; i++ {
 			h.put("bar", fmt.Sprintf("b%d", i))
 			h.put("foo", fmt.Sprintf("v%d", i))
@@ -719,6 +726,8 @@
 
 func TestDB_GetEncountersEmptyLevel(t *testing.T) {
 	trun(t, func(h *dbHarness) {
+		h.db.memdbMaxLevel = 2
+
 		// Arrange for the following to happen:
 		//   * sstable A in level 0
 		//   * nothing in level 1
@@ -1192,9 +1201,11 @@
 	trun(t, func(h *dbHarness) {
 		s := h.db.s
 
+		m := 2
+		h.db.memdbMaxLevel = m
+
 		h.put("foo", "v1")
 		h.compactMem()
-		m := h.o.GetMaxMemCompationLevel()
 		v := s.version()
 		num := v.tLen(m)
 		v.release()
@@ -1236,9 +1247,11 @@
 	defer h.close()
 	s := h.db.s
 
+	m := 2
+	h.db.memdbMaxLevel = m
+
 	h.put("foo", "v1")
 	h.compactMem()
-	m := h.o.GetMaxMemCompationLevel()
 	v := s.version()
 	num := v.tLen(m)
 	v.release()
@@ -1276,6 +1289,8 @@
 	h := newDbHarnessWopt(t, &opt.Options{OpenFilesCacheCapacity: -1})
 	defer h.close()
 
+	h.db.memdbMaxLevel = 2
+
 	im := 10
 	jm := 10
 	for r := 0; r < 2; r++ {
@@ -1288,7 +1303,7 @@
 	}
 
 	if n := h.totalTables(); n != im*2 {
-		t.Errorf("total tables is %d, want %d", n, im)
+		t.Errorf("total tables is %d, want %d", n, im*2)
 	}
 
 	h.stor.SetEmuErr(storage.TypeTable, tsOpOpen)
@@ -1309,9 +1324,7 @@
 
 func TestDB_OverlapInLevel0(t *testing.T) {
 	trun(t, func(h *dbHarness) {
-		if h.o.GetMaxMemCompationLevel() != 2 {
-			t.Fatal("fix test to reflect the config")
-		}
+		h.db.memdbMaxLevel = 2
 
 		// Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0.
 		h.put("100", "v100")
@@ -1429,7 +1442,7 @@
 			h.compactMem()
 			h.getVal("foo", "bar")
 			v := h.db.s.version()
-			if n := v.tLen(h.o.GetMaxMemCompationLevel()); n != 1 {
+			if n := v.tLen(0); n != 1 {
 				t.Errorf("invalid total tables, want=1 got=%d", n)
 			}
 			v.release()
@@ -1441,7 +1454,7 @@
 			}
 
 			// Merging compaction (will fail)
-			h.compactRangeAtErr(h.o.GetMaxMemCompationLevel(), "", "", true)
+			h.compactRangeAtErr(0, "", "", true)
 
 			h.db.Close()
 			h.stor.SetEmuErr(0, tsOpWrite)
@@ -1595,9 +1608,7 @@
 	h := newDbHarness(t)
 	defer h.close()
 
-	if h.o.GetMaxMemCompationLevel() != 2 {
-		t.Fatal("fix test to reflect the config")
-	}
+	h.db.memdbMaxLevel = 2
 
 	h.putMulti(3, "p", "q")
 	h.tablesPerLevel("1,1,1")
@@ -2594,6 +2605,8 @@
 	})
 	defer h.close()
 
+	h.db.memdbMaxLevel = 2
+
 	key := func(x int) string {
 		return fmt.Sprintf("v%06d", x)
 	}
@@ -2699,3 +2712,33 @@
 	ro("bar", "vx", "v2")
 	h.assertNumKeys(4)
 }
+
+func TestDB_BulkInsertDelete(t *testing.T) {
+	h := newDbHarnessWopt(t, &opt.Options{
+		Compression:         opt.NoCompression,
+		CompactionTableSize: 128 * opt.KiB,
+		CompactionTotalSize: 1 * opt.MiB,
+		WriteBuffer:         256 * opt.KiB,
+	})
+	defer h.close()
+
+	const R = 100
+	const N = 2500
+	key := make([]byte, 4)
+	value := make([]byte, 256)
+	for i := 0; i < R; i++ {
+		offset := N * i
+		for j := 0; j < N; j++ {
+			binary.BigEndian.PutUint32(key, uint32(offset+j))
+			h.db.Put(key, value, nil)
+		}
+		for j := 0; j < N; j++ {
+			binary.BigEndian.PutUint32(key, uint32(offset+j))
+			h.db.Delete(key, nil)
+		}
+	}
+
+	if tot := h.totalTables(); tot > 10 {
+		t.Fatalf("too many uncompacted tables: %d (%s)", tot, h.getTablesPerLevel())
+	}
+}
diff --git a/leveldb/opt/options.go b/leveldb/opt/options.go
index 7b5d8b9..25313da 100644
--- a/leveldb/opt/options.go
+++ b/leveldb/opt/options.go
@@ -35,7 +35,6 @@
 	DefaultCompactionTotalSizeMultiplier = 10.0
 	DefaultCompressionType               = SnappyCompression
 	DefaultIteratorSamplingRate          = 1 * MiB
-	DefaultMaxMemCompationLevel          = 2
 	DefaultNumLevel                      = 7
 	DefaultOpenFilesCacher               = LRUCacher
 	DefaultOpenFilesCacheCapacity        = 500
@@ -301,13 +300,6 @@
 	// The default is 1MiB.
 	IteratorSamplingRate int
 
-	// MaxMemCompationLevel defines maximum level a newly compacted 'memdb'
-	// will be pushed into if doesn't creates overlap. This should less than
-	// NumLevel. Use -1 for level-0.
-	//
-	// The default is 2.
-	MaxMemCompationLevel int
-
 	// NoSync allows completely disable fsync.
 	//
 	// The default is false.
@@ -536,21 +528,6 @@
 	return o.IteratorSamplingRate
 }
 
-func (o *Options) GetMaxMemCompationLevel() int {
-	level := DefaultMaxMemCompationLevel
-	if o != nil {
-		if o.MaxMemCompationLevel > 0 {
-			level = o.MaxMemCompationLevel
-		} else if o.MaxMemCompationLevel < 0 {
-			level = 0
-		}
-	}
-	if level >= o.GetNumLevel() {
-		return o.GetNumLevel() - 1
-	}
-	return level
-}
-
 func (o *Options) GetNoSync() bool {
 	if o == nil {
 		return false
diff --git a/leveldb/session_compaction.go b/leveldb/session_compaction.go
index 36aa1d8..b99e86d 100644
--- a/leveldb/session_compaction.go
+++ b/leveldb/session_compaction.go
@@ -14,25 +14,30 @@
 	"github.com/syndtr/goleveldb/leveldb/opt"
 )
 
-func (s *session) pickMemdbLevel(umin, umax []byte) int {
+func (s *session) pickMemdbLevel(umin, umax []byte, maxLevel int) int {
 	v := s.version()
 	defer v.release()
-	return v.pickMemdbLevel(umin, umax)
+	return v.pickMemdbLevel(umin, umax, maxLevel)
 }
 
-func (s *session) flushMemdb(rec *sessionRecord, mdb *memdb.DB, level int) (level_ int, err error) {
+func (s *session) flushMemdb(rec *sessionRecord, mdb *memdb.DB, maxLevel int) (level int, err error) {
 	// Create sorted table.
 	iter := mdb.NewIterator(nil)
 	defer iter.Release()
 	t, n, err := s.tops.createFrom(iter)
 	if err != nil {
-		return level, err
+		return
 	}
 
-	// Pick level and add to record.
-	if level < 0 {
-		level = s.pickMemdbLevel(t.imin.ukey(), t.imax.ukey())
-	}
+	// Pick level other than zero can cause compaction issue with large
+	// bulk insert and delete on strictly incrementing key-space. The
+	// problem is that the small deletion markers trapped at lower level,
+	// while key/value entries keep growing at higher level. Since the
+	// key-space is strictly incrementing it will not overlaps with
+	// higher level, thus maximum possible level is always picked, while
+	// overlapping deletion marker pushed into lower level.
+	// See: https://github.com/syndtr/goleveldb/issues/127.
+	level = s.pickMemdbLevel(t.imin.ukey(), t.imax.ukey(), maxLevel)
 	rec.addTableFile(level, t)
 
 	s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", level, t.file.Num(), n, shortenb(int(t.size)), t.imin, t.imax)
@@ -210,7 +215,8 @@
 }
 
 func (c *compaction) baseLevelForKey(ukey []byte) bool {
-	for level, tables := range c.v.tables[c.level+2:] {
+	for level := c.level + 2; level < len(c.v.tables); level++ {
+		tables := c.v.tables[level]
 		for c.tPtrs[level] < len(tables) {
 			t := tables[c.tPtrs[level]]
 			if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 {
diff --git a/leveldb/version.go b/leveldb/version.go
index 7227de2..d6497dd 100644
--- a/leveldb/version.go
+++ b/leveldb/version.go
@@ -300,21 +300,21 @@
 	return
 }
 
-func (v *version) pickMemdbLevel(umin, umax []byte) (level int) {
-	if !v.tables[0].overlaps(v.s.icmp, umin, umax, true) {
-		var overlaps tFiles
-		maxLevel := v.s.o.GetMaxMemCompationLevel()
-		for ; level < maxLevel; level++ {
-			if v.tables[level+1].overlaps(v.s.icmp, umin, umax, false) {
-				break
-			}
-			overlaps = v.tables[level+2].getOverlaps(overlaps, v.s.icmp, umin, umax, false)
-			if overlaps.size() > uint64(v.s.o.GetCompactionGPOverlaps(level)) {
-				break
+func (v *version) pickMemdbLevel(umin, umax []byte, maxLevel int) (level int) {
+	if maxLevel > 0 {
+		if !v.tables[0].overlaps(v.s.icmp, umin, umax, true) {
+			var overlaps tFiles
+			for ; level < maxLevel; level++ {
+				if v.tables[level+1].overlaps(v.s.icmp, umin, umax, false) {
+					break
+				}
+				overlaps = v.tables[level+2].getOverlaps(overlaps, v.s.icmp, umin, umax, false)
+				if overlaps.size() > uint64(v.s.o.GetCompactionGPOverlaps(level)) {
+					break
+				}
 			}
 		}
 	}
-
 	return
 }