level: fixes table compaction that may generate tables with overlapped ukey

This become a problem when new compaction picked-up and table key range search
doesn't expanded in the asumption that ukey never hop across tables (true in level > 0),
when this happen newer ukey might pushed into lower level, and since version.get
assume that this never happen, version.get will incorectly return older ukey instead.

This commit also fixes compaction retry, and will always expand key range search
when pick-up new compaction.

Fixes #83.
diff --git a/leveldb/db_compaction.go b/leveldb/db_compaction.go
index ce6d32c..447407a 100644
--- a/leveldb/db_compaction.go
+++ b/leveldb/db_compaction.go
@@ -172,12 +172,17 @@
 	*cnt++
 }
 
-func (db *DB) compactionTransact(name string, exec func(cnt *compactionTransactCounter) error, rollback func() error) {
+type compactionTransactInterface interface {
+	run(cnt *compactionTransactCounter) error
+	revert() error
+}
+
+func (db *DB) compactionTransact(name string, t compactionTransactInterface) {
 	defer func() {
 		if x := recover(); x != nil {
-			if x == errCompactionTransactExiting && rollback != nil {
-				if err := rollback(); err != nil {
-					db.logf("%s rollback error %q", name, err)
+			if x == errCompactionTransactExiting {
+				if err := t.revert(); err != nil {
+					db.logf("%s revert error %q", name, err)
 				}
 			}
 			panic(x)
@@ -207,7 +212,7 @@
 
 		// Execute.
 		cnt := compactionTransactCounter(0)
-		err := exec(&cnt)
+		err := t.run(&cnt)
 		if err != nil {
 			db.logf("%s error I·%d %q", name, cnt, err)
 		}
@@ -257,6 +262,26 @@
 	}
 }
 
+type compactionTransactFunc struct {
+	runFunc    func(cnt *compactionTransactCounter) error
+	revertFunc func() error
+}
+
+func (t *compactionTransactFunc) run(cnt *compactionTransactCounter) error {
+	return t.runFunc(cnt)
+}
+
+func (t *compactionTransactFunc) revert() error {
+	if t.revertFunc != nil {
+		return t.revertFunc()
+	}
+	return nil
+}
+
+func (db *DB) compactionTransactFunc(name string, run func(cnt *compactionTransactCounter) error, revert func() error) {
+	db.compactionTransact(name, &compactionTransactFunc{run, revert})
+}
+
 func (db *DB) compactionExitTransact() {
 	panic(errCompactionTransactExiting)
 }
@@ -292,13 +317,13 @@
 		return
 	}
 
-	db.compactionTransact("mem@flush", func(cnt *compactionTransactCounter) (err error) {
+	db.compactionTransactFunc("mem@flush", func(cnt *compactionTransactCounter) (err error) {
 		stats.startTimer()
 		defer stats.stopTimer()
 		return c.flush(mem.mdb, -1)
 	}, func() error {
 		for _, r := range c.rec.addedTables {
-			db.logf("mem@flush rollback @%d", r.num)
+			db.logf("mem@flush revert @%d", r.num)
 			f := db.s.getTableFile(r.num)
 			if err := f.Remove(); err != nil {
 				return err
@@ -307,7 +332,7 @@
 		return nil
 	})
 
-	db.compactionTransact("mem@commit", func(cnt *compactionTransactCounter) (err error) {
+	db.compactionTransactFunc("mem@commit", func(cnt *compactionTransactCounter) (err error) {
 		stats.startTimer()
 		defer stats.stopTimer()
 		return c.commit(db.journalFile.Num(), db.frozenSeq)
@@ -337,6 +362,198 @@
 	db.compSendTrigger(db.tcompCmdC)
 }
 
+type tableCompactionBuilder struct {
+	db           *DB
+	s            *session
+	c            *compaction
+	rec          *sessionRecord
+	stat0, stat1 *cStatsStaging
+
+	snapHasLastUkey bool
+	snapLastUkey    []byte
+	snapLastSeq     uint64
+	snapIter        int
+	snapKerrCnt     int
+	snapDropCnt     int
+
+	kerrCnt int
+	dropCnt int
+
+	minSeq    uint64
+	strict    bool
+	tableSize int
+
+	tw *tWriter
+}
+
+func (b *tableCompactionBuilder) appendKV(key, value []byte) error {
+	// Create new table if not already.
+	if b.tw == nil {
+		// Check for pause event.
+		if b.db != nil {
+			select {
+			case ch := <-b.db.tcompPauseC:
+				b.db.pauseCompaction(ch)
+			case _, _ = <-b.db.closeC:
+				b.db.compactionExitTransact()
+			default:
+			}
+		}
+
+		// Create new table.
+		var err error
+		b.tw, err = b.s.tops.create()
+		if err != nil {
+			return err
+		}
+	}
+
+	// Write key/value into table.
+	return b.tw.append(key, value)
+}
+
+func (b *tableCompactionBuilder) needFlush() bool {
+	return b.tw.tw.BytesLen() >= b.tableSize
+}
+
+func (b *tableCompactionBuilder) flush() error {
+	t, err := b.tw.finish()
+	if err != nil {
+		return err
+	}
+	b.rec.addTableFile(b.c.level+1, t)
+	b.stat1.write += t.size
+	b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.level+1, t.file.Num(), b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax)
+	b.tw = nil
+	return nil
+}
+
+func (b *tableCompactionBuilder) cleanup() {
+	if b.tw != nil {
+		b.tw.drop()
+		b.tw = nil
+	}
+}
+
+func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) error {
+	snapResumed := b.snapIter > 0
+	hasLastUkey := b.snapHasLastUkey // The key might has zero length, so this is necessary.
+	lastUkey := append([]byte{}, b.snapLastUkey...)
+	lastSeq := b.snapLastSeq
+	b.kerrCnt = b.snapKerrCnt
+	b.dropCnt = b.snapDropCnt
+	// Restore compaction state.
+	b.c.restore()
+
+	defer b.cleanup()
+
+	b.stat1.startTimer()
+	defer b.stat1.stopTimer()
+
+	iter := b.c.newIterator()
+	defer iter.Release()
+	for i := 0; iter.Next(); i++ {
+		// Incr transact counter.
+		cnt.incr()
+
+		// Skip until last state.
+		if i < b.snapIter {
+			continue
+		}
+
+		resumed := false
+		if snapResumed {
+			resumed = true
+			snapResumed = false
+		}
+
+		ikey := iter.Key()
+		ukey, seq, kt, kerr := parseIkey(ikey)
+
+		if kerr == nil {
+			shouldStop := !resumed && b.c.shouldStopBefore(ikey)
+
+			if !hasLastUkey || b.s.icmp.uCompare(lastUkey, ukey) != 0 {
+				// First occurrence of this user key.
+
+				// Only rotate tables if ukey doesn't hop across.
+				if b.tw != nil && (shouldStop || b.needFlush()) {
+					if err := b.flush(); err != nil {
+						return err
+					}
+
+					// Creates snapshot of the state.
+					b.c.save()
+					b.snapHasLastUkey = hasLastUkey
+					b.snapLastUkey = append(b.snapLastUkey[:0], lastUkey...)
+					b.snapLastSeq = lastSeq
+					b.snapIter = i
+					b.snapKerrCnt = b.kerrCnt
+					b.snapDropCnt = b.dropCnt
+				}
+
+				hasLastUkey = true
+				lastUkey = append(lastUkey[:0], ukey...)
+				lastSeq = kMaxSeq
+			}
+
+			switch {
+			case lastSeq <= b.minSeq:
+				// Dropped because newer entry for same user key exist
+				fallthrough // (A)
+			case kt == ktDel && seq <= b.minSeq && b.c.baseLevelForKey(lastUkey):
+				// For this user key:
+				// (1) there is no data in higher levels
+				// (2) data in lower levels will have larger seq numbers
+				// (3) data in layers that are being compacted here and have
+				//     smaller seq numbers will be dropped in the next
+				//     few iterations of this loop (by rule (A) above).
+				// Therefore this deletion marker is obsolete and can be dropped.
+				lastSeq = seq
+				b.dropCnt++
+				continue
+			default:
+				lastSeq = seq
+			}
+		} else {
+			if b.strict {
+				return kerr
+			}
+
+			// Don't drop corrupted keys.
+			hasLastUkey = false
+			lastUkey = lastUkey[:0]
+			lastSeq = kMaxSeq
+			b.kerrCnt++
+		}
+
+		if err := b.appendKV(ikey, iter.Value()); err != nil {
+			return err
+		}
+	}
+
+	if err := iter.Error(); err != nil {
+		return err
+	}
+
+	// Finish last table.
+	if b.tw != nil && !b.tw.empty() {
+		return b.flush()
+	}
+	return nil
+}
+
+func (b *tableCompactionBuilder) revert() error {
+	for _, at := range b.rec.addedTables {
+		b.s.logf("table@build revert @%d", at.num)
+		f := b.s.getTableFile(at.num)
+		if err := f.Remove(); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
 func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
 	defer c.release()
 
@@ -348,7 +565,7 @@
 		db.logf("table@move L%d@%d -> L%d", c.level, t.file.Num(), c.level+1)
 		rec.delTable(c.level, t.file.Num())
 		rec.addTableFile(c.level+1, t)
-		db.compactionTransact("table@move", func(cnt *compactionTransactCounter) (err error) {
+		db.compactionTransactFunc("table@move", func(cnt *compactionTransactCounter) (err error) {
 			return db.s.commit(rec)
 		}, nil)
 		return
@@ -366,192 +583,27 @@
 	minSeq := db.minSeq()
 	db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.level, len(c.tables[0]), c.level+1, len(c.tables[1]), shortenb(sourceSize), minSeq)
 
-	var (
-		snapHasLastUkey bool
-		snapLastUkey    []byte
-		snapLastSeq     uint64
-		snapIter        int
-		snapKerrCnt     int
-		snapDropCnt     int
-
-		kerrCnt int
-		dropCnt int
-
-		strict    = db.s.o.GetStrict(opt.StrictCompaction)
-		tableSize = db.s.o.GetCompactionTableSize(c.level + 1)
-	)
-	db.compactionTransact("table@build", func(cnt *compactionTransactCounter) (err error) {
-		hasLastUkey := snapHasLastUkey // The key might has zero length, so this is necessary.
-		lastUkey := append([]byte{}, snapLastUkey...)
-		lastSeq := snapLastSeq
-		kerrCnt = snapKerrCnt
-		dropCnt = snapDropCnt
-		snapSched := snapIter == 0
-
-		var tw *tWriter
-		finish := func() error {
-			t, err := tw.finish()
-			if err != nil {
-				return err
-			}
-			rec.addTableFile(c.level+1, t)
-			stats[1].write += t.size
-			db.logf("table@build created L%d@%d N·%d S·%s %q:%q", c.level+1, t.file.Num(), tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax)
-			return nil
-		}
-
-		defer func() {
-			stats[1].stopTimer()
-			if tw != nil {
-				tw.drop()
-				tw = nil
-			}
-		}()
-
-		stats[1].startTimer()
-		iter := c.newIterator()
-		defer iter.Release()
-		for i := 0; iter.Next(); i++ {
-			// Incr transact counter.
-			cnt.incr()
-
-			// Skip until last state.
-			if i < snapIter {
-				continue
-			}
-
-			ikey := iter.Key()
-			ukey, seq, kt, kerr := parseIkey(ikey)
-
-			// Skip this if key is corrupted.
-			if kerr == nil && c.shouldStopBefore(ikey) && tw != nil {
-				err = finish()
-				if err != nil {
-					return
-				}
-				snapSched = true
-				tw = nil
-			}
-
-			// Scheduled for snapshot, snapshot will used to retry compaction
-			// if error occured.
-			if snapSched {
-				snapHasLastUkey = hasLastUkey
-				snapLastUkey = append(snapLastUkey[:0], lastUkey...)
-				snapLastSeq = lastSeq
-				snapIter = i
-				snapKerrCnt = kerrCnt
-				snapDropCnt = dropCnt
-				snapSched = false
-			}
-
-			if kerr == nil {
-				if !hasLastUkey || db.s.icmp.uCompare(lastUkey, ukey) != 0 {
-					// First occurrence of this user key.
-					hasLastUkey = true
-					lastUkey = append(lastUkey[:0], ukey...)
-					lastSeq = kMaxSeq
-				}
-
-				switch {
-				case lastSeq <= minSeq:
-					// Dropped because newer entry for same user key exist
-					fallthrough // (A)
-				case kt == ktDel && seq <= minSeq && c.baseLevelForKey(lastUkey):
-					// For this user key:
-					// (1) there is no data in higher levels
-					// (2) data in lower levels will have larger seq numbers
-					// (3) data in layers that are being compacted here and have
-					//     smaller seq numbers will be dropped in the next
-					//     few iterations of this loop (by rule (A) above).
-					// Therefore this deletion marker is obsolete and can be dropped.
-					lastSeq = seq
-					dropCnt++
-					continue
-				default:
-					lastSeq = seq
-				}
-			} else {
-				if strict {
-					return kerr
-				}
-
-				// Don't drop corrupted keys
-				hasLastUkey = false
-				lastUkey = lastUkey[:0]
-				lastSeq = kMaxSeq
-				kerrCnt++
-			}
-
-			// Create new table if not already
-			if tw == nil {
-				// Check for pause event.
-				select {
-				case ch := <-db.tcompPauseC:
-					db.pauseCompaction(ch)
-				case _, _ = <-db.closeC:
-					db.compactionExitTransact()
-				default:
-				}
-
-				// Create new table.
-				tw, err = db.s.tops.create()
-				if err != nil {
-					return
-				}
-			}
-
-			// Write key/value into table
-			err = tw.append(ikey, iter.Value())
-			if err != nil {
-				return
-			}
-
-			// Finish table if it is big enough
-			if tw.tw.BytesLen() >= tableSize {
-				err = finish()
-				if err != nil {
-					return
-				}
-				snapSched = true
-				tw = nil
-			}
-		}
-
-		err = iter.Error()
-		if err != nil {
-			return
-		}
-
-		// Finish last table
-		if tw != nil && !tw.empty() {
-			err = finish()
-			if err != nil {
-				return
-			}
-			tw = nil
-		}
-		return
-	}, func() error {
-		for _, r := range rec.addedTables {
-			db.logf("table@build rollback @%d", r.num)
-			f := db.s.getTableFile(r.num)
-			if err := f.Remove(); err != nil {
-				return err
-			}
-		}
-		return nil
-	})
+	b := &tableCompactionBuilder{
+		db:        db,
+		s:         db.s,
+		c:         c,
+		rec:       rec,
+		stat1:     &stats[1],
+		minSeq:    minSeq,
+		strict:    db.s.o.GetStrict(opt.StrictCompaction),
+		tableSize: db.s.o.GetCompactionTableSize(c.level + 1),
+	}
+	db.compactionTransact("table@build", b)
 
 	// Commit changes
-	db.compactionTransact("table@commit", func(cnt *compactionTransactCounter) (err error) {
+	db.compactionTransactFunc("table@commit", func(cnt *compactionTransactCounter) (err error) {
 		stats[1].startTimer()
 		defer stats[1].stopTimer()
 		return db.s.commit(rec)
 	}, nil)
 
 	resultSize := int(stats[1].write)
-	db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), kerrCnt, dropCnt, stats[1].duration)
+	db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration)
 
 	// Save compaction stats
 	for i := range stats {
diff --git a/leveldb/db_test.go b/leveldb/db_test.go
index 65e2074..220129c 100644
--- a/leveldb/db_test.go
+++ b/leveldb/db_test.go
@@ -2298,3 +2298,271 @@
 
 	wg.Wait()
 }
+
+func TestDb_UkeyShouldntHopAcrossTable(t *testing.T) {
+	h := newDbHarnessWopt(t, &opt.Options{
+		WriteBuffer:                 112 * opt.KiB,
+		CompactionTableSize:         90 * opt.KiB,
+		CompactionExpandLimitFactor: 1,
+	})
+	defer h.close()
+
+	const (
+		nSnap = 190
+		nKey  = 140
+	)
+
+	var (
+		snaps [nSnap]*Snapshot
+		b     = &Batch{}
+	)
+	for i := range snaps {
+		vtail := fmt.Sprintf("VAL%030d", i)
+		b.Reset()
+		for k := 0; k < nKey; k++ {
+			key := fmt.Sprintf("KEY%08d", k)
+			b.Put([]byte(key), []byte(key+vtail))
+		}
+		if err := h.db.Write(b, nil); err != nil {
+			t.Fatalf("WRITE #%d error: %v", i, err)
+		}
+
+		snaps[i] = h.db.newSnapshot()
+		b.Reset()
+		for k := 0; k < nKey; k++ {
+			key := fmt.Sprintf("KEY%08d", k)
+			b.Delete([]byte(key))
+		}
+		if err := h.db.Write(b, nil); err != nil {
+			t.Fatalf("WRITE #%d  error: %v", i, err)
+		}
+	}
+
+	h.compactMem()
+
+	h.waitCompaction()
+	for level, tables := range h.db.s.stVersion.tables {
+		for _, table := range tables {
+			t.Logf("L%d@%d %q:%q", level, table.file.Num(), table.imin, table.imax)
+		}
+	}
+
+	h.compactRangeAt(0, "", "")
+	h.waitCompaction()
+	for level, tables := range h.db.s.stVersion.tables {
+		for _, table := range tables {
+			t.Logf("L%d@%d %q:%q", level, table.file.Num(), table.imin, table.imax)
+		}
+	}
+	h.compactRangeAt(1, "", "")
+	h.waitCompaction()
+	for level, tables := range h.db.s.stVersion.tables {
+		for _, table := range tables {
+			t.Logf("L%d@%d %q:%q", level, table.file.Num(), table.imin, table.imax)
+		}
+	}
+	runtime.GOMAXPROCS(runtime.NumCPU())
+
+	wg := &sync.WaitGroup{}
+	for i, snap := range snaps {
+		wg.Add(1)
+
+		go func(i int, snap *Snapshot) {
+			defer wg.Done()
+
+			vtail := fmt.Sprintf("VAL%030d", i)
+			for k := 0; k < nKey; k++ {
+				key := fmt.Sprintf("KEY%08d", k)
+				xvalue, err := snap.Get([]byte(key), nil)
+				if err != nil {
+					t.Fatalf("READER_GET #%d SEQ=%d K%d error: %v", i, snap.elem.seq, k, err)
+				}
+				value := key + vtail
+				if !bytes.Equal([]byte(value), xvalue) {
+					t.Fatalf("READER_GET #%d SEQ=%d K%d invalid value: want %q, got %q", i, snap.elem.seq, k, value, xvalue)
+				}
+			}
+		}(i, snap)
+	}
+
+	wg.Wait()
+}
+
+func TestDb_TableCompactionBuilder(t *testing.T) {
+	stor := newTestStorage(t)
+	defer stor.Close()
+
+	const nSeq = 99
+
+	o := &opt.Options{
+		WriteBuffer:                 112 * opt.KiB,
+		CompactionTableSize:         43 * opt.KiB,
+		CompactionExpandLimitFactor: 1,
+		CompactionGPOverlapsFactor:  1,
+		BlockCache:                  opt.NoCache,
+	}
+	s, err := newSession(stor, o)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if err := s.create(); err != nil {
+		t.Fatal(err)
+	}
+	defer s.close()
+	var (
+		seq        uint64
+		targetSize = 5 * o.CompactionTableSize
+		value      = bytes.Repeat([]byte{'0'}, 100)
+	)
+	for i := 0; i < 2; i++ {
+		tw, err := s.tops.create()
+		if err != nil {
+			t.Fatal(err)
+		}
+		for k := 0; tw.tw.BytesLen() < targetSize; k++ {
+			key := []byte(fmt.Sprintf("%09d", k))
+			seq += nSeq - 1
+			for x := uint64(0); x < nSeq; x++ {
+				if err := tw.append(newIkey(key, seq-x, ktVal), value); err != nil {
+					t.Fatal(err)
+				}
+			}
+		}
+		tf, err := tw.finish()
+		if err != nil {
+			t.Fatal(err)
+		}
+		rec := &sessionRecord{numLevel: s.o.GetNumLevel()}
+		rec.addTableFile(i, tf)
+		if err := s.commit(rec); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	// Build grandparent.
+	v := s.version()
+	c := newCompaction(s, v, 1, append(tFiles{}, v.tables[1]...))
+	rec := &sessionRecord{numLevel: s.o.GetNumLevel()}
+	b := &tableCompactionBuilder{
+		s:         s,
+		c:         c,
+		rec:       rec,
+		stat1:     new(cStatsStaging),
+		minSeq:    0,
+		strict:    true,
+		tableSize: o.CompactionTableSize/3 + 961,
+	}
+	if err := b.run(new(compactionTransactCounter)); err != nil {
+		t.Fatal(err)
+	}
+	for _, t := range c.tables[0] {
+		rec.delTable(c.level, t.file.Num())
+	}
+	if err := s.commit(rec); err != nil {
+		t.Fatal(err)
+	}
+	c.release()
+
+	// Build level-1.
+	v = s.version()
+	c = newCompaction(s, v, 0, append(tFiles{}, v.tables[0]...))
+	rec = &sessionRecord{numLevel: s.o.GetNumLevel()}
+	b = &tableCompactionBuilder{
+		s:         s,
+		c:         c,
+		rec:       rec,
+		stat1:     new(cStatsStaging),
+		minSeq:    0,
+		strict:    true,
+		tableSize: o.CompactionTableSize,
+	}
+	if err := b.run(new(compactionTransactCounter)); err != nil {
+		t.Fatal(err)
+	}
+	for _, t := range c.tables[0] {
+		rec.delTable(c.level, t.file.Num())
+	}
+	// Move grandparent to level-3
+	for _, t := range v.tables[2] {
+		rec.delTable(2, t.file.Num())
+		rec.addTableFile(3, t)
+	}
+	if err := s.commit(rec); err != nil {
+		t.Fatal(err)
+	}
+	c.release()
+
+	v = s.version()
+	for level, want := range []bool{false, true, false, true, false} {
+		got := len(v.tables[level]) > 0
+		if want != got {
+			t.Fatalf("invalid level-%d tables len: want %v, got %v", want, got)
+		}
+	}
+	for i, f := range v.tables[1][:len(v.tables[1])-1] {
+		nf := v.tables[1][i+1]
+		if bytes.Equal(f.imax.ukey(), nf.imin.ukey()) {
+			t.Fatalf("KEY %q hop across table %d .. %d", f.imax.ukey(), f.file.Num(), nf.file.Num())
+		}
+	}
+	v.release()
+
+	// Compaction with transient error.
+	v = s.version()
+	c = newCompaction(s, v, 1, append(tFiles{}, v.tables[1]...))
+	rec = &sessionRecord{numLevel: s.o.GetNumLevel()}
+	b = &tableCompactionBuilder{
+		s:         s,
+		c:         c,
+		rec:       rec,
+		stat1:     new(cStatsStaging),
+		minSeq:    0,
+		strict:    true,
+		tableSize: o.CompactionTableSize,
+	}
+	stor.SetEmuErrOnce(storage.TypeTable, tsOpSync)
+	stor.SetEmuRandErr(storage.TypeTable, tsOpRead, tsOpReadAt, tsOpWrite)
+	stor.SetEmuRandErrProb(0xf0)
+	for {
+		if err := b.run(new(compactionTransactCounter)); err != nil {
+			t.Logf("(expected) b.run: %v", err)
+		} else {
+			break
+		}
+	}
+	if err := s.commit(rec); err != nil {
+		t.Fatal(err)
+	}
+	c.release()
+
+	stor.SetEmuErrOnce(0, tsOpSync)
+	stor.SetEmuRandErr(0, tsOpRead, tsOpReadAt, tsOpWrite)
+
+	v = s.version()
+	if len(v.tables[1]) != len(v.tables[2]) {
+		t.Fatalf("invalid tables length, want %d, got %d", len(v.tables[1]), len(v.tables[2]))
+	}
+	for i, f0 := range v.tables[1] {
+		f1 := v.tables[2][i]
+		iter0 := s.tops.newIterator(f0, nil, nil)
+		iter1 := s.tops.newIterator(f1, nil, nil)
+		for j := 0; true; j++ {
+			next0 := iter0.Next()
+			next1 := iter1.Next()
+			if next0 != next1 {
+				t.Fatalf("#%d.%d invalid eoi: want %v, got %v", i, j, next0, next1)
+			}
+			key0 := iter0.Key()
+			key1 := iter1.Key()
+			if !bytes.Equal(key0, key1) {
+				t.Fatalf("#%d.%d invalid key: want %q, got %q", i, j, key0, key1)
+			}
+			if next0 == false {
+				break
+			}
+		}
+		iter0.Release()
+		iter1.Release()
+	}
+	v.release()
+}
diff --git a/leveldb/session.go b/leveldb/session.go
index e39adaa..5b251ef 100644
--- a/leveldb/session.go
+++ b/leveldb/session.go
@@ -240,21 +240,7 @@
 		}
 	}
 
-	if level == 0 {
-		imin, imax := t0.getRange(s.icmp)
-		t0 = v.tables[0].getOverlaps(t0[:0], s.icmp, imin.ukey(), imax.ukey(), true)
-	}
-
-	c := &compaction{
-		s:             s,
-		v:             v,
-		level:         level,
-		maxGPOverlaps: uint64(s.o.GetCompactionGPOverlaps(level)),
-		tPtrs:         make([]int, v.s.o.GetNumLevel()),
-	}
-	c.tables[0] = t0
-	c.expand()
-	return c
+	return newCompaction(s, v, level, t0)
 }
 
 // Create compaction from given level and range; need external synchronization.
@@ -284,15 +270,20 @@
 		}
 	}
 
+	return newCompaction(s, v, level, t0)
+}
+
+func newCompaction(s *session, v *version, level int, t0 tFiles) *compaction {
 	c := &compaction{
 		s:             s,
 		v:             v,
 		level:         level,
+		tables:        [2]tFiles{t0, nil},
 		maxGPOverlaps: uint64(s.o.GetCompactionGPOverlaps(level)),
-		tPtrs:         make([]int, v.s.o.GetNumLevel()),
+		tPtrs:         make([]int, s.o.GetNumLevel()),
 	}
-	c.tables[0] = t0
 	c.expand()
+	c.save()
 	return c
 }
 
@@ -306,12 +297,31 @@
 	maxGPOverlaps uint64
 
 	gp                tFiles
-	gpidx             int
+	gpi               int
 	seenKey           bool
 	gpOverlappedBytes uint64
 	imin, imax        iKey
 	tPtrs             []int
 	released          bool
+
+	snapGPI               int
+	snapSeenKey           bool
+	snapGPOverlappedBytes uint64
+	snapTPtrs             []int
+}
+
+func (c *compaction) save() {
+	c.snapGPI = c.gpi
+	c.snapSeenKey = c.seenKey
+	c.snapGPOverlappedBytes = c.gpOverlappedBytes
+	c.snapTPtrs = append(c.snapTPtrs[:0], c.tPtrs...)
+}
+
+func (c *compaction) restore() {
+	c.gpi = c.snapGPI
+	c.seenKey = c.snapSeenKey
+	c.gpOverlappedBytes = c.snapGPOverlappedBytes
+	c.tPtrs = append(c.tPtrs[:0], c.snapTPtrs...)
 }
 
 func (c *compaction) release() {
@@ -328,6 +338,11 @@
 
 	t0, t1 := c.tables[0], c.tables[1]
 	imin, imax := t0.getRange(c.s.icmp)
+	// We expand t0 here just incase ukey hop across tables.
+	t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.level == 0)
+	if len(t0) != len(c.tables[0]) {
+		imin, imax = t0.getRange(c.s.icmp)
+	}
 	t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false)
 	// Get entire range covered by compaction.
 	amin, amax := append(t0, t1...).getRange(c.s.icmp)
@@ -384,8 +399,8 @@
 }
 
 func (c *compaction) shouldStopBefore(ikey iKey) bool {
-	for ; c.gpidx < len(c.gp); c.gpidx++ {
-		gp := c.gp[c.gpidx]
+	for ; c.gpi < len(c.gp); c.gpi++ {
+		gp := c.gp[c.gpi]
 		if c.s.icmp.Compare(ikey, gp.imax) <= 0 {
 			break
 		}
diff --git a/leveldb/storage_test.go b/leveldb/storage_test.go
index f39ed6d..06dfd2e 100644
--- a/leveldb/storage_test.go
+++ b/leveldb/storage_test.go
@@ -37,7 +37,7 @@
 	tsNum     = 0
 )
 
-type tsOp int
+type tsOp uint
 
 const (
 	tsOpOpen tsOp = iota
@@ -132,7 +132,7 @@
 
 func (tw tsWriter) Close() (err error) {
 	err = tw.Writer.Close()
-	tw.tf.close("reader", err)
+	tw.tf.close("writer", err)
 	return
 }
 
@@ -264,20 +264,23 @@
 	readCnt       uint64
 	readCntEn     storage.FileType
 
-	emuErr        [tsOpNum]storage.FileType
-	emuRandErr    [tsOpNum]storage.FileType
-	emuRandErrMap map[uint64]tsOp
-	emuRandRand   *rand.Rand
+	emuErr         [tsOpNum]storage.FileType
+	emuErrOnce     [tsOpNum]storage.FileType
+	emuRandErr     [tsOpNum]storage.FileType
+	emuRandErrProb int
+	emuErrOnceMap  map[uint64]uint
+	emuRandRand    *rand.Rand
 }
 
 func (ts *testStorage) shouldErr(tf tsFile, op tsOp) bool {
 	if ts.emuErr[op]&tf.Type() != 0 {
 		return true
-	} else if ts.emuRandErr[op]&tf.Type() != 0 {
-		eop := ts.emuRandErrMap[tf.x()]
-		if eop&op == 0 && ts.emuRandRand.Int()%0x999 == 0 {
-			ts.emuRandErrMap[tf.x()] = eop | op
-			ts.t.Logf("I: random emulated error: file=%d type=%v op=%v", tf.Num(), tf.Type(), op)
+	} else if ts.emuRandErr[op]&tf.Type() != 0 || ts.emuErrOnce[op]&tf.Type() != 0 {
+		sop := uint(1) << op
+		eop := ts.emuErrOnceMap[tf.x()]
+		if eop&sop == 0 && (ts.emuRandRand.Int()%ts.emuRandErrProb == 0 || ts.emuErrOnce[op]&tf.Type() != 0) {
+			ts.emuErrOnceMap[tf.x()] = eop | sop
+			ts.t.Logf("I: emulated error: file=%d type=%v op=%v", tf.Num(), tf.Type(), op)
 			return true
 		}
 	}
@@ -292,6 +295,14 @@
 	ts.mu.Unlock()
 }
 
+func (ts *testStorage) SetEmuErrOnce(t storage.FileType, ops ...tsOp) {
+	ts.mu.Lock()
+	for _, op := range ops {
+		ts.emuErrOnce[op] = t
+	}
+	ts.mu.Unlock()
+}
+
 func (ts *testStorage) SetEmuRandErr(t storage.FileType, ops ...tsOp) {
 	ts.mu.Lock()
 	for _, op := range ops {
@@ -300,6 +311,12 @@
 	ts.mu.Unlock()
 }
 
+func (ts *testStorage) SetEmuRandErrProb(prob int) {
+	ts.mu.Lock()
+	ts.emuRandErrProb = prob
+	ts.mu.Unlock()
+}
+
 func (ts *testStorage) DelaySync(t storage.FileType) {
 	ts.mu.Lock()
 	ts.emuDelaySync |= t
@@ -488,12 +505,13 @@
 		stor = storage.NewMemStorage()
 	}
 	ts := &testStorage{
-		t:             t,
-		Storage:       stor,
-		closeFn:       closeFn,
-		opens:         make(map[uint64]bool),
-		emuRandErrMap: make(map[uint64]tsOp),
-		emuRandRand:   rand.New(rand.NewSource(0xfacedead)),
+		t:              t,
+		Storage:        stor,
+		closeFn:        closeFn,
+		opens:          make(map[uint64]bool),
+		emuErrOnceMap:  make(map[uint64]uint),
+		emuRandErrProb: 0x999,
+		emuRandRand:    rand.New(rand.NewSource(0xfacedead)),
 	}
 	ts.cond.L = &ts.mu
 	return ts
diff --git a/leveldb/table.go b/leveldb/table.go
index 5941073..ea6eb29 100644
--- a/leveldb/table.go
+++ b/leveldb/table.go
@@ -7,6 +7,7 @@
 package leveldb
 
 import (
+	"fmt"
 	"sort"
 	"sync/atomic"
 
@@ -82,6 +83,18 @@
 func (tf tFiles) Len() int      { return len(tf) }
 func (tf tFiles) Swap(i, j int) { tf[i], tf[j] = tf[j], tf[i] }
 
+func (tf tFiles) nums() string {
+	x := "[ "
+	for i, f := range tf {
+		if i != 0 {
+			x += ", "
+		}
+		x += fmt.Sprint(f.file.Num())
+	}
+	x += " ]"
+	return x
+}
+
 // Returns true if i smallest key is less than j.
 // This used for sort by key in ascending order.
 func (tf tFiles) lessByKey(icmp *iComparer, i, j int) bool {
@@ -159,24 +172,25 @@
 }
 
 // Returns tables whose its key range overlaps with given key range.
-// If overlapped is true then the search will be expanded to tables that
-// overlaps with each other.
+// Range will be expanded if ukey found hop across tables.
+// If overlapped is true then the search will be restarted if umax
+// expanded.
+// The dst content will be overwritten.
 func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles {
-	x := len(dst)
+	dst = dst[:0]
 	for i := 0; i < len(tf); {
 		t := tf[i]
 		if t.overlaps(icmp, umin, umax) {
-			if overlapped {
-				// For overlapped files, check if the newly added file has
-				// expanded the range. If so, restart search.
-				if umin != nil && icmp.uCompare(t.imin.ukey(), umin) < 0 {
-					umin = t.imin.ukey()
-					dst = dst[:x]
-					i = 0
-					continue
-				} else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 {
-					umax = t.imax.ukey()
-					dst = dst[:x]
+			if umin != nil && icmp.uCompare(t.imin.ukey(), umin) < 0 {
+				umin = t.imin.ukey()
+				dst = dst[:0]
+				i = 0
+				continue
+			} else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 {
+				umax = t.imax.ukey()
+				// Restart search if it is overlapped.
+				if overlapped {
+					dst = dst[:0]
 					i = 0
 					continue
 				}
@@ -446,28 +460,34 @@
 	return w.first == nil
 }
 
+// Closes the storage.Writer.
+func (w *tWriter) close() {
+	if w.w != nil {
+		w.w.Close()
+		w.w = nil
+	}
+}
+
 // Finalizes the table and returns table file.
 func (w *tWriter) finish() (f *tFile, err error) {
+	defer w.close()
 	err = w.tw.Close()
 	if err != nil {
 		return
 	}
 	err = w.w.Sync()
 	if err != nil {
-		w.w.Close()
 		return
 	}
-	w.w.Close()
 	f = newTableFile(w.file, uint64(w.tw.BytesLen()), iKey(w.first), iKey(w.last))
 	return
 }
 
 // Drops the table.
 func (w *tWriter) drop() {
-	w.w.Close()
+	w.close()
 	w.file.Remove()
 	w.t.s.reuseFileNum(w.file.Num())
-	w.w = nil
 	w.file = nil
 	w.tw = nil
 	w.first = nil
diff --git a/leveldb/util.go b/leveldb/util.go
index a43d2e4..1a5bf71 100644
--- a/leveldb/util.go
+++ b/leveldb/util.go
@@ -14,10 +14,10 @@
 )
 
 func shorten(str string) string {
-	if len(str) <= 4 {
+	if len(str) <= 8 {
 		return str
 	}
-	return str[:1] + ".." + str[len(str)-1:]
+	return str[:3] + ".." + str[len(str)-3:]
 }
 
 var bunits = [...]string{"", "Ki", "Mi", "Gi"}