leveldb: allows iterator to trigger compaction

This implements fix that was introduced in LevelDB 1.13, to fix issue
where large contiguous keyspace of deleted data was not getting compacted.

Reported at syncthing/syncthing#1480.
diff --git a/leveldb/db_iter.go b/leveldb/db_iter.go
index 4607e5d..011a94a 100644
--- a/leveldb/db_iter.go
+++ b/leveldb/db_iter.go
@@ -8,6 +8,7 @@
 
 import (
 	"errors"
+	"math/rand"
 	"runtime"
 	"sync"
 	"sync/atomic"
@@ -80,6 +81,10 @@
 	return iter
 }
 
+func (db *DB) iterSamplingRate() int {
+	return rand.Intn(2 * db.s.o.GetIteratorSamplingRate())
+}
+
 type dir int
 
 const (
@@ -98,11 +103,21 @@
 	seq    uint64
 	strict bool
 
-	dir      dir
-	key      []byte
-	value    []byte
-	err      error
-	releaser util.Releaser
+	smaplingGap int
+	dir         dir
+	key         []byte
+	value       []byte
+	err         error
+	releaser    util.Releaser
+}
+
+func (i *dbIter) sampleSeek() {
+	ikey := i.iter.Key()
+	i.smaplingGap -= len(ikey) + len(i.iter.Value())
+	for i.smaplingGap < 0 {
+		i.smaplingGap += i.db.iterSamplingRate()
+		i.db.sampleSeek(ikey)
+	}
 }
 
 func (i *dbIter) setErr(err error) {
@@ -175,6 +190,7 @@
 func (i *dbIter) next() bool {
 	for {
 		if ukey, seq, kt, kerr := parseIkey(i.iter.Key()); kerr == nil {
+			i.sampleSeek()
 			if seq <= i.seq {
 				switch kt {
 				case ktDel:
@@ -225,6 +241,7 @@
 	if i.iter.Valid() {
 		for {
 			if ukey, seq, kt, kerr := parseIkey(i.iter.Key()); kerr == nil {
+				i.sampleSeek()
 				if seq <= i.seq {
 					if !del && i.icmp.uCompare(ukey, i.key) < 0 {
 						return true
@@ -266,6 +283,7 @@
 	case dirForward:
 		for i.iter.Prev() {
 			if ukey, _, _, kerr := parseIkey(i.iter.Key()); kerr == nil {
+				i.sampleSeek()
 				if i.icmp.uCompare(ukey, i.key) < 0 {
 					goto cont
 				}
diff --git a/leveldb/db_state.go b/leveldb/db_state.go
index 24ecab5..d4db9d6 100644
--- a/leveldb/db_state.go
+++ b/leveldb/db_state.go
@@ -48,6 +48,15 @@
 	atomic.AddUint64(&db.seq, delta)
 }
 
+func (db *DB) sampleSeek(ikey iKey) {
+	v := db.s.version()
+	if v.sampleSeek(ikey) {
+		// Trigger table compaction.
+		db.compSendTrigger(db.tcompCmdC)
+	}
+	v.release()
+}
+
 func (db *DB) mpoolPut(mem *memdb.DB) {
 	defer func() {
 		recover()
diff --git a/leveldb/db_test.go b/leveldb/db_test.go
index 0acb567..e8237cb 100644
--- a/leveldb/db_test.go
+++ b/leveldb/db_test.go
@@ -405,19 +405,21 @@
 	t.Log("DB range compaction done")
 }
 
-func (h *dbHarness) sizeAssert(start, limit string, low, hi uint64) {
-	t := h.t
-	db := h.db
-
-	s, err := db.SizeOf([]util.Range{
+func (h *dbHarness) sizeOf(start, limit string) uint64 {
+	sz, err := h.db.SizeOf([]util.Range{
 		{[]byte(start), []byte(limit)},
 	})
 	if err != nil {
-		t.Error("SizeOf: got error: ", err)
+		h.t.Error("SizeOf: got error: ", err)
 	}
-	if s.Sum() < low || s.Sum() > hi {
-		t.Errorf("sizeof %q to %q not in range, want %d - %d, got %d",
-			shorten(start), shorten(limit), low, hi, s.Sum())
+	return sz.Sum()
+}
+
+func (h *dbHarness) sizeAssert(start, limit string, low, hi uint64) {
+	sz := h.sizeOf(start, limit)
+	if sz < low || sz > hi {
+		h.t.Errorf("sizeof %q to %q not in range, want %d - %d, got %d",
+			shorten(start), shorten(limit), low, hi, sz)
 	}
 }
 
@@ -2577,3 +2579,83 @@
 	}
 	v.release()
 }
+
+func testDB_IterTriggeredCompaction(t *testing.T, limitDiv int) {
+	const (
+		vSize = 200 * opt.KiB
+		tSize = 100 * opt.MiB
+		mIter = 100
+		n     = tSize / vSize
+	)
+
+	h := newDbHarnessWopt(t, &opt.Options{
+		Compression:       opt.NoCompression,
+		DisableBlockCache: true,
+	})
+	defer h.close()
+
+	key := func(x int) string {
+		return fmt.Sprintf("v%06d", x)
+	}
+
+	// Fill.
+	value := strings.Repeat("x", vSize)
+	for i := 0; i < n; i++ {
+		h.put(key(i), value)
+	}
+	h.compactMem()
+
+	// Delete all.
+	for i := 0; i < n; i++ {
+		h.delete(key(i))
+	}
+	h.compactMem()
+
+	var (
+		limit = n / limitDiv
+
+		startKey = key(0)
+		limitKey = key(limit)
+		maxKey   = key(n)
+		slice    = &util.Range{Limit: []byte(limitKey)}
+
+		initialSize0 = h.sizeOf(startKey, limitKey)
+		initialSize1 = h.sizeOf(limitKey, maxKey)
+	)
+
+	t.Logf("inital size %s [rest %s]", shortenb(int(initialSize0)), shortenb(int(initialSize1)))
+
+	for r := 0; true; r++ {
+		if r >= mIter {
+			t.Fatal("taking too long to compact")
+		}
+
+		// Iterates.
+		iter := h.db.NewIterator(slice, h.ro)
+		for iter.Next() {
+		}
+		if err := iter.Error(); err != nil {
+			t.Fatalf("Iter err: %v", err)
+		}
+		iter.Release()
+
+		// Wait compaction.
+		h.waitCompaction()
+
+		// Check size.
+		size0 := h.sizeOf(startKey, limitKey)
+		size1 := h.sizeOf(limitKey, maxKey)
+		t.Logf("#%03d size %s [rest %s]", r, shortenb(int(size0)), shortenb(int(size1)))
+		if size0 < initialSize0 {
+			break
+		}
+	}
+}
+
+func TestDB_IterTriggeredCompaction(t *testing.T) {
+	testDB_IterTriggeredCompaction(t, 1)
+}
+
+func TestDB_IterTriggeredCompactionHalf(t *testing.T) {
+	testDB_IterTriggeredCompaction(t, 2)
+}
diff --git a/leveldb/opt/options.go b/leveldb/opt/options.go
index 2b42897..61f0ead 100644
--- a/leveldb/opt/options.go
+++ b/leveldb/opt/options.go
@@ -34,10 +34,11 @@
 	DefaultCompactionTotalSize           = 10 * MiB
 	DefaultCompactionTotalSizeMultiplier = 10.0
 	DefaultCompressionType               = SnappyCompression
-	DefaultOpenFilesCacher               = LRUCacher
-	DefaultOpenFilesCacheCapacity        = 500
+	DefaultIteratorSamplingRate          = 1 * MiB
 	DefaultMaxMemCompationLevel          = 2
 	DefaultNumLevel                      = 7
+	DefaultOpenFilesCacher               = LRUCacher
+	DefaultOpenFilesCacheCapacity        = 500
 	DefaultWriteBuffer                   = 4 * MiB
 	DefaultWriteL0PauseTrigger           = 12
 	DefaultWriteL0SlowdownTrigger        = 8
@@ -288,6 +289,13 @@
 	// The default value is nil.
 	Filter filter.Filter
 
+	// IteratorSamplingRate defines approximate gap (in bytes) between read
+	// sampling of an iterator. The samples will be used to determine when
+	// compaction should be triggered.
+	//
+	// The default is 1MiB.
+	IteratorSamplingRate int
+
 	// MaxMemCompationLevel defines maximum level a newly compacted 'memdb'
 	// will be pushed into if doesn't creates overlap. This should less than
 	// NumLevel. Use -1 for level-0.
@@ -492,6 +500,13 @@
 	return o.Filter
 }
 
+func (o *Options) GetIteratorSamplingRate() int {
+	if o == nil || o.IteratorSamplingRate <= 0 {
+		return DefaultIteratorSamplingRate
+	}
+	return o.IteratorSamplingRate
+}
+
 func (o *Options) GetMaxMemCompationLevel() int {
 	level := DefaultMaxMemCompationLevel
 	if o != nil {
diff --git a/leveldb/version.go b/leveldb/version.go
index 5ab7b53..88a52f5 100644
--- a/leveldb/version.go
+++ b/leveldb/version.go
@@ -136,9 +136,8 @@
 		if !tseek {
 			if tset == nil {
 				tset = &tSet{level, t}
-			} else if tset.table.consumeSeek() <= 0 {
+			} else {
 				tseek = true
-				tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
 			}
 		}
 
@@ -203,6 +202,28 @@
 		return true
 	})
 
+	if tseek && tset.table.consumeSeek() <= 0 {
+		tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
+	}
+
+	return
+}
+
+func (v *version) sampleSeek(ikey iKey) (tcomp bool) {
+	var tset *tSet
+
+	v.walkOverlapping(ikey, func(level int, t *tFile) bool {
+		if tset == nil {
+			tset = &tSet{level, t}
+			return true
+		} else {
+			if tset.table.consumeSeek() <= 0 {
+				tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
+			}
+			return false
+		}
+	}, nil)
+
 	return
 }