leveldb: cleanup DB.recoverJournal(), memdb, session record and split session.go
diff --git a/leveldb/db.go b/leveldb/db.go
index b948214..caf9b07 100644
--- a/leveldb/db.go
+++ b/leveldb/db.go
@@ -275,7 +275,7 @@
// We will drop corrupted table.
strict = o.GetStrict(opt.StrictRecovery)
- rec = &sessionRecord{numLevel: o.GetNumLevel()}
+ rec = &sessionRecord{}
bpool = util.NewBufferPool(o.GetBlockSize() + 5)
)
buildTable := func(iter iterator.Iterator) (tmp storage.File, size int64, err error) {
@@ -450,132 +450,132 @@
}
func (db *DB) recoverJournal() error {
- // Get all tables and sort it by file number.
- journalFiles_, err := db.s.getFiles(storage.TypeJournal)
+ // Get all journals and sort it by file number.
+ allJournalFiles, err := db.s.getFiles(storage.TypeJournal)
if err != nil {
return err
}
- journalFiles := files(journalFiles_)
- journalFiles.sort()
+ files(allJournalFiles).sort()
- // Discard older journal.
- prev := -1
- for i, file := range journalFiles {
- if file.Num() >= db.s.stJournalNum {
- if prev >= 0 {
- i--
- journalFiles[i] = journalFiles[prev]
- }
- journalFiles = journalFiles[i:]
- break
- } else if file.Num() == db.s.stPrevJournalNum {
- prev = i
+ // Journals that will be recovered.
+ var recJournalFiles []storage.File
+ for _, jf := range allJournalFiles {
+ if jf.Num() >= db.s.stJournalNum || jf.Num() == db.s.stPrevJournalNum {
+ recJournalFiles = append(recJournalFiles, jf)
}
}
- var jr *journal.Reader
- var of storage.File
- var mem *memdb.DB
- batch := new(Batch)
- cm := newCMem(db.s)
- buf := new(util.Buffer)
- // Options.
- strict := db.s.o.GetStrict(opt.StrictJournal)
- checksum := db.s.o.GetStrict(opt.StrictJournalChecksum)
- writeBuffer := db.s.o.GetWriteBuffer()
- recoverJournal := func(file storage.File) error {
- db.logf("journal@recovery recovering @%d", file.Num())
- reader, err := file.Open()
- if err != nil {
- return err
- }
- defer reader.Close()
+ var (
+ of storage.File // Obsolete file.
+ rec = &sessionRecord{}
+ )
- // Create/reset journal reader instance.
- if jr == nil {
- jr = journal.NewReader(reader, dropper{db.s, file}, strict, checksum)
- } else {
- jr.Reset(reader, dropper{db.s, file}, strict, checksum)
- }
-
- // Flush memdb and remove obsolete journal file.
- if of != nil {
- if mem.Len() > 0 {
- if err := cm.flush(mem, 0); err != nil {
- return err
- }
- }
- if err := cm.commit(file.Num(), db.seq); err != nil {
- return err
- }
- cm.reset()
- of.Remove()
- of = nil
- }
-
- // Replay journal to memdb.
- mem.Reset()
- for {
- r, err := jr.Next()
- if err != nil {
- if err == io.EOF {
- break
- }
- return errors.SetFile(err, file)
- }
-
- buf.Reset()
- if _, err := buf.ReadFrom(r); err != nil {
- if err == io.ErrUnexpectedEOF {
- // This is error returned due to corruption, with strict == false.
- continue
- } else {
- return errors.SetFile(err, file)
- }
- }
- if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mem); err != nil {
- if strict || !errors.IsCorrupted(err) {
- return errors.SetFile(err, file)
- } else {
- db.s.logf("journal error: %v (skipped)", err)
- // We won't apply sequence number as it might be corrupted.
- continue
- }
- }
-
- // Save sequence number.
- db.seq = batch.seq + uint64(batch.Len())
-
- // Flush it if large enough.
- if mem.Size() >= writeBuffer {
- if err := cm.flush(mem, 0); err != nil {
- return err
- }
- mem.Reset()
- }
- }
-
- of = file
- return nil
- }
-
- // Recover all journals.
- if len(journalFiles) > 0 {
- db.logf("journal@recovery F·%d", len(journalFiles))
+ // Recover journals.
+ if len(recJournalFiles) > 0 {
+ db.logf("journal@recovery F·%d", len(recJournalFiles))
// Mark file number as used.
- db.s.markFileNum(journalFiles[len(journalFiles)-1].Num())
+ db.s.markFileNum(recJournalFiles[len(recJournalFiles)-1].Num())
- mem = memdb.New(db.s.icmp, writeBuffer)
- for _, file := range journalFiles {
- if err := recoverJournal(file); err != nil {
+ var (
+ // Options.
+ strict = db.s.o.GetStrict(opt.StrictJournal)
+ checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
+ writeBuffer = db.s.o.GetWriteBuffer()
+
+ jr *journal.Reader
+ mdb = memdb.New(db.s.icmp, writeBuffer)
+ buf = &util.Buffer{}
+ batch = &Batch{}
+ )
+
+ for _, jf := range recJournalFiles {
+ db.logf("journal@recovery recovering @%d", jf.Num())
+
+ err := func() error {
+ fr, err := jf.Open()
+ if err != nil {
+ return err
+ }
+ defer fr.Close()
+
+ // Create or reset journal reader instance.
+ if jr == nil {
+ jr = journal.NewReader(fr, dropper{db.s, jf}, strict, checksum)
+ } else {
+ jr.Reset(fr, dropper{db.s, jf}, strict, checksum)
+ }
+
+ // Flush memdb and remove obsolete journal file.
+ if of != nil {
+ if mdb.Len() > 0 {
+ if _, err := db.s.flushMemdb(rec, mdb, -1); err != nil {
+ return err
+ }
+ }
+ rec.setJournalNum(jf.Num())
+ rec.setSeqNum(db.seq)
+ if err := db.s.commit(rec); err != nil {
+ return err
+ }
+ rec.resetAddedTables()
+ of.Remove()
+ of = nil
+ }
+
+ // Replay journal to memdb.
+ mdb.Reset()
+ for {
+ r, err := jr.Next()
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ return errors.SetFile(err, jf)
+ }
+
+ buf.Reset()
+ if _, err := buf.ReadFrom(r); err != nil {
+ if err == io.ErrUnexpectedEOF {
+ // This is error returned due to corruption, with strict == false.
+ continue
+ } else {
+ return errors.SetFile(err, jf)
+ }
+ }
+ if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
+ if strict || !errors.IsCorrupted(err) {
+ return errors.SetFile(err, jf)
+ } else {
+ db.s.logf("journal error: %v (skipped)", err)
+ // We won't apply sequence number as it might be corrupted.
+ continue
+ }
+ }
+
+ // Save sequence number.
+ db.seq = batch.seq + uint64(batch.Len())
+
+ // Flush it if large enough.
+ if mdb.Size() >= writeBuffer {
+ if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
+ return err
+ }
+ mdb.Reset()
+ }
+ }
+
+ of = jf
+ return nil
+ }()
+ if err != nil {
return err
}
}
- // Flush the last journal.
- if mem.Len() > 0 {
- if err := cm.flush(mem, 0); err != nil {
+ // Flush the last memdb.
+ if mdb.Len() > 0 {
+ if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
return err
}
}
@@ -587,8 +587,10 @@
}
// Commit.
- if err := cm.commit(db.journalFile.Num(), db.seq); err != nil {
- // Close journal.
+ rec.setJournalNum(db.journalFile.Num())
+ rec.setSeqNum(db.seq)
+ if err := db.s.commit(rec); err != nil {
+ // Close journal on error.
if db.journal != nil {
db.journal.Close()
db.journalWriter.Close()
@@ -614,7 +616,7 @@
}
defer m.decref()
- mk, mv, me := m.mdb.Find(ikey)
+ mk, mv, me := m.Find(ikey)
if me == nil {
ukey, _, kt, kerr := parseIkey(mk)
if kerr != nil {
@@ -652,7 +654,7 @@
}
defer m.decref()
- mk, _, me := m.mdb.Find(ikey)
+ mk, _, me := m.Find(ikey)
if me == nil {
ukey, _, kt, kerr := parseIkey(mk)
if kerr != nil {
diff --git a/leveldb/db_compaction.go b/leveldb/db_compaction.go
index 447407a..3b4c469 100644
--- a/leveldb/db_compaction.go
+++ b/leveldb/db_compaction.go
@@ -11,7 +11,6 @@
"time"
"github.com/syndtr/goleveldb/leveldb/errors"
- "github.com/syndtr/goleveldb/leveldb/memdb"
"github.com/syndtr/goleveldb/leveldb/opt"
)
@@ -62,53 +61,6 @@
}
}
-type cMem struct {
- s *session
- level int
- rec *sessionRecord
-}
-
-func newCMem(s *session) *cMem {
- return &cMem{s: s, rec: &sessionRecord{numLevel: s.o.GetNumLevel()}}
-}
-
-func (c *cMem) flush(mem *memdb.DB, level int) error {
- s := c.s
-
- // Write memdb to table.
- iter := mem.NewIterator(nil)
- defer iter.Release()
- t, n, err := s.tops.createFrom(iter)
- if err != nil {
- return err
- }
-
- // Pick level.
- if level < 0 {
- v := s.version()
- level = v.pickLevel(t.imin.ukey(), t.imax.ukey())
- v.release()
- }
- c.rec.addTableFile(level, t)
-
- s.logf("mem@flush created L%d@%d N·%d S·%s %q:%q", level, t.file.Num(), n, shortenb(int(t.size)), t.imin, t.imax)
-
- c.level = level
- return nil
-}
-
-func (c *cMem) reset() {
- c.rec = &sessionRecord{numLevel: c.s.o.GetNumLevel()}
-}
-
-func (c *cMem) commit(journal, seq uint64) error {
- c.rec.setJournalNum(journal)
- c.rec.setSeqNum(seq)
-
- // Commit changes.
- return c.s.commit(c.rec)
-}
-
func (db *DB) compactionError() {
var (
err error
@@ -287,21 +239,18 @@
}
func (db *DB) memCompaction() {
- mem := db.getFrozenMem()
- if mem == nil {
+ mdb := db.getFrozenMem()
+ if mdb == nil {
return
}
- defer mem.decref()
+ defer mdb.decref()
- c := newCMem(db.s)
- stats := new(cStatsStaging)
-
- db.logf("mem@flush N·%d S·%s", mem.mdb.Len(), shortenb(mem.mdb.Size()))
+ db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(mdb.Size()))
// Don't compact empty memdb.
- if mem.mdb.Len() == 0 {
- db.logf("mem@flush skipping")
- // drop frozen mem
+ if mdb.Len() == 0 {
+ db.logf("memdb@flush skipping")
+ // drop frozen memdb
db.dropFrozenMem()
return
}
@@ -317,13 +266,20 @@
return
}
- db.compactionTransactFunc("mem@flush", func(cnt *compactionTransactCounter) (err error) {
+ var (
+ rec = &sessionRecord{}
+ stats = &cStatsStaging{}
+ flushLevel int
+ )
+
+ db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) {
stats.startTimer()
- defer stats.stopTimer()
- return c.flush(mem.mdb, -1)
+ flushLevel, err = db.s.flushMemdb(rec, mdb.DB, -1)
+ stats.stopTimer()
+ return
}, func() error {
- for _, r := range c.rec.addedTables {
- db.logf("mem@flush revert @%d", r.num)
+ for _, r := range rec.addedTables {
+ db.logf("memdb@flush revert @%d", r.num)
f := db.s.getTableFile(r.num)
if err := f.Remove(); err != nil {
return err
@@ -332,20 +288,23 @@
return nil
})
- db.compactionTransactFunc("mem@commit", func(cnt *compactionTransactCounter) (err error) {
+ db.compactionTransactFunc("memdb@commit", func(cnt *compactionTransactCounter) (err error) {
stats.startTimer()
- defer stats.stopTimer()
- return c.commit(db.journalFile.Num(), db.frozenSeq)
+ rec.setJournalNum(db.journalFile.Num())
+ rec.setSeqNum(db.frozenSeq)
+ err = db.s.commit(rec)
+ stats.stopTimer()
+ return
}, nil)
- db.logf("mem@flush committed F·%d T·%v", len(c.rec.addedTables), stats.duration)
+ db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration)
- for _, r := range c.rec.addedTables {
+ for _, r := range rec.addedTables {
stats.write += r.size
}
- db.compStats[c.level].add(stats)
+ db.compStats[flushLevel].add(stats)
- // Drop frozen mem.
+ // Drop frozen memdb.
db.dropFrozenMem()
// Resume table compaction.
@@ -557,7 +516,7 @@
func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
defer c.release()
- rec := &sessionRecord{numLevel: db.s.o.GetNumLevel()}
+ rec := &sessionRecord{}
rec.addCompPtr(c.level, c.imax)
if !noTrivial && c.trivial() {
diff --git a/leveldb/db_iter.go b/leveldb/db_iter.go
index 011a94a..656ae98 100644
--- a/leveldb/db_iter.go
+++ b/leveldb/db_iter.go
@@ -40,11 +40,11 @@
ti := v.getIterators(slice, ro)
n := len(ti) + 2
i := make([]iterator.Iterator, 0, n)
- emi := em.mdb.NewIterator(slice)
+ emi := em.NewIterator(slice)
emi.SetReleaser(&memdbReleaser{m: em})
i = append(i, emi)
if fm != nil {
- fmi := fm.mdb.NewIterator(slice)
+ fmi := fm.NewIterator(slice)
fmi.SetReleaser(&memdbReleaser{m: fm})
i = append(i, fmi)
}
diff --git a/leveldb/db_state.go b/leveldb/db_state.go
index d4db9d6..24671dd 100644
--- a/leveldb/db_state.go
+++ b/leveldb/db_state.go
@@ -15,8 +15,8 @@
)
type memDB struct {
- db *DB
- mdb *memdb.DB
+ db *DB
+ *memdb.DB
ref int32
}
@@ -27,12 +27,12 @@
func (m *memDB) decref() {
if ref := atomic.AddInt32(&m.ref, -1); ref == 0 {
// Only put back memdb with std capacity.
- if m.mdb.Capacity() == m.db.s.o.GetWriteBuffer() {
- m.mdb.Reset()
- m.db.mpoolPut(m.mdb)
+ if m.Capacity() == m.db.s.o.GetWriteBuffer() {
+ m.Reset()
+ m.db.mpoolPut(m.DB)
}
m.db = nil
- m.mdb = nil
+ m.DB = nil
} else if ref < 0 {
panic("negative memdb ref")
}
@@ -126,7 +126,7 @@
}
mem = &memDB{
db: db,
- mdb: mdb,
+ DB: mdb,
ref: 2,
}
db.mem = mem
diff --git a/leveldb/db_test.go b/leveldb/db_test.go
index 38bfbf1..2f60d17 100644
--- a/leveldb/db_test.go
+++ b/leveldb/db_test.go
@@ -2445,7 +2445,7 @@
if err != nil {
t.Fatal(err)
}
- rec := &sessionRecord{numLevel: s.o.GetNumLevel()}
+ rec := &sessionRecord{}
rec.addTableFile(i, tf)
if err := s.commit(rec); err != nil {
t.Fatal(err)
@@ -2455,7 +2455,7 @@
// Build grandparent.
v := s.version()
c := newCompaction(s, v, 1, append(tFiles{}, v.tables[1]...))
- rec := &sessionRecord{numLevel: s.o.GetNumLevel()}
+ rec := &sessionRecord{}
b := &tableCompactionBuilder{
s: s,
c: c,
@@ -2479,7 +2479,7 @@
// Build level-1.
v = s.version()
c = newCompaction(s, v, 0, append(tFiles{}, v.tables[0]...))
- rec = &sessionRecord{numLevel: s.o.GetNumLevel()}
+ rec = &sessionRecord{}
b = &tableCompactionBuilder{
s: s,
c: c,
@@ -2523,7 +2523,7 @@
// Compaction with transient error.
v = s.version()
c = newCompaction(s, v, 1, append(tFiles{}, v.tables[1]...))
- rec = &sessionRecord{numLevel: s.o.GetNumLevel()}
+ rec = &sessionRecord{}
b = &tableCompactionBuilder{
s: s,
c: c,
diff --git a/leveldb/db_write.go b/leveldb/db_write.go
index e1cf30c..1e412b3 100644
--- a/leveldb/db_write.go
+++ b/leveldb/db_write.go
@@ -63,24 +63,24 @@
return
}
-func (db *DB) flush(n int) (mem *memDB, nn int, err error) {
+func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
delayed := false
flush := func() (retry bool) {
v := db.s.version()
defer v.release()
- mem = db.getEffectiveMem()
+ mdb = db.getEffectiveMem()
defer func() {
if retry {
- mem.decref()
- mem = nil
+ mdb.decref()
+ mdb = nil
}
}()
- nn = mem.mdb.Free()
+ mdbFree = mdb.Free()
switch {
case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed:
delayed = true
time.Sleep(time.Millisecond)
- case nn >= n:
+ case mdbFree >= n:
return false
case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger():
delayed = true
@@ -90,15 +90,15 @@
}
default:
// Allow memdb to grow if it has no entry.
- if mem.mdb.Len() == 0 {
- nn = n
+ if mdb.Len() == 0 {
+ mdbFree = n
} else {
- mem.decref()
- mem, err = db.rotateMem(n)
+ mdb.decref()
+ mdb, err = db.rotateMem(n)
if err == nil {
- nn = mem.mdb.Free()
+ mdbFree = mdb.Free()
} else {
- nn = 0
+ mdbFree = 0
}
}
return false
@@ -157,18 +157,18 @@
}
}()
- mem, memFree, err := db.flush(b.size())
+ mdb, mdbFree, err := db.flush(b.size())
if err != nil {
return
}
- defer mem.decref()
+ defer mdb.decref()
// Calculate maximum size of the batch.
m := 1 << 20
if x := b.size(); x <= 128<<10 {
m = x + (128 << 10)
}
- m = minInt(m, memFree)
+ m = minInt(m, mdbFree)
// Merge with other batch.
drain:
@@ -197,7 +197,7 @@
select {
case db.journalC <- b:
// Write into memdb
- if berr := b.memReplay(mem.mdb); berr != nil {
+ if berr := b.memReplay(mdb.DB); berr != nil {
panic(berr)
}
case err = <-db.compPerErrC:
@@ -211,7 +211,7 @@
case err = <-db.journalAckC:
if err != nil {
// Revert memdb if error detected
- if berr := b.revertMemReplay(mem.mdb); berr != nil {
+ if berr := b.revertMemReplay(mdb.DB); berr != nil {
panic(berr)
}
return
@@ -225,7 +225,7 @@
if err != nil {
return
}
- if berr := b.memReplay(mem.mdb); berr != nil {
+ if berr := b.memReplay(mdb.DB); berr != nil {
panic(berr)
}
}
@@ -233,7 +233,7 @@
// Set last seq number.
db.addSeq(uint64(b.Len()))
- if b.size() >= memFree {
+ if b.size() >= mdbFree {
db.rotateMem(0)
}
return
@@ -290,9 +290,9 @@
}
// Check for overlaps in memdb.
- mem := db.getEffectiveMem()
- defer mem.decref()
- if isMemOverlaps(db.s.icmp, mem.mdb, r.Start, r.Limit) {
+ mdb := db.getEffectiveMem()
+ defer mdb.decref()
+ if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
// Memdb compaction.
if _, err := db.rotateMem(0); err != nil {
<-db.writeLockC
diff --git a/leveldb/session.go b/leveldb/session.go
index b3906f7..f0bba46 100644
--- a/leveldb/session.go
+++ b/leveldb/session.go
@@ -11,10 +11,8 @@
"io"
"os"
"sync"
- "sync/atomic"
"github.com/syndtr/goleveldb/leveldb/errors"
- "github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/journal"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage"
@@ -127,11 +125,16 @@
return
}
defer reader.Close()
- strict := s.o.GetStrict(opt.StrictManifest)
- jr := journal.NewReader(reader, dropper{s, m}, strict, true)
- staging := s.stVersion.newStaging()
- rec := &sessionRecord{numLevel: s.o.GetNumLevel()}
+ var (
+ // Options.
+ numLevel = s.o.GetNumLevel()
+ strict = s.o.GetStrict(opt.StrictManifest)
+
+ jr = journal.NewReader(reader, dropper{s, m}, strict, true)
+ rec = &sessionRecord{}
+ staging = s.stVersion.newStaging()
+ )
for {
var r io.Reader
r, err = jr.Next()
@@ -143,7 +146,7 @@
return errors.SetFile(err, m)
}
- err = rec.decode(r)
+ err = rec.decode(r, numLevel)
if err == nil {
// save compact pointers
for _, r := range rec.compPtrs {
@@ -206,250 +209,3 @@
return
}
-
-// Pick a compaction based on current state; need external synchronization.
-func (s *session) pickCompaction() *compaction {
- v := s.version()
-
- var level int
- var t0 tFiles
- if v.cScore >= 1 {
- level = v.cLevel
- cptr := s.stCompPtrs[level]
- tables := v.tables[level]
- for _, t := range tables {
- if cptr == nil || s.icmp.Compare(t.imax, cptr) > 0 {
- t0 = append(t0, t)
- break
- }
- }
- if len(t0) == 0 {
- t0 = append(t0, tables[0])
- }
- } else {
- if p := atomic.LoadPointer(&v.cSeek); p != nil {
- ts := (*tSet)(p)
- level = ts.level
- t0 = append(t0, ts.table)
- } else {
- v.release()
- return nil
- }
- }
-
- return newCompaction(s, v, level, t0)
-}
-
-// Create compaction from given level and range; need external synchronization.
-func (s *session) getCompactionRange(level int, umin, umax []byte) *compaction {
- v := s.version()
-
- t0 := v.tables[level].getOverlaps(nil, s.icmp, umin, umax, level == 0)
- if len(t0) == 0 {
- v.release()
- return nil
- }
-
- // Avoid compacting too much in one shot in case the range is large.
- // But we cannot do this for level-0 since level-0 files can overlap
- // and we must not pick one file and drop another older file if the
- // two files overlap.
- if level > 0 {
- limit := uint64(v.s.o.GetCompactionSourceLimit(level))
- total := uint64(0)
- for i, t := range t0 {
- total += t.size
- if total >= limit {
- s.logf("table@compaction limiting F·%d -> F·%d", len(t0), i+1)
- t0 = t0[:i+1]
- break
- }
- }
- }
-
- return newCompaction(s, v, level, t0)
-}
-
-func newCompaction(s *session, v *version, level int, t0 tFiles) *compaction {
- c := &compaction{
- s: s,
- v: v,
- level: level,
- tables: [2]tFiles{t0, nil},
- maxGPOverlaps: uint64(s.o.GetCompactionGPOverlaps(level)),
- tPtrs: make([]int, s.o.GetNumLevel()),
- }
- c.expand()
- c.save()
- return c
-}
-
-// compaction represent a compaction state.
-type compaction struct {
- s *session
- v *version
-
- level int
- tables [2]tFiles
- maxGPOverlaps uint64
-
- gp tFiles
- gpi int
- seenKey bool
- gpOverlappedBytes uint64
- imin, imax iKey
- tPtrs []int
- released bool
-
- snapGPI int
- snapSeenKey bool
- snapGPOverlappedBytes uint64
- snapTPtrs []int
-}
-
-func (c *compaction) save() {
- c.snapGPI = c.gpi
- c.snapSeenKey = c.seenKey
- c.snapGPOverlappedBytes = c.gpOverlappedBytes
- c.snapTPtrs = append(c.snapTPtrs[:0], c.tPtrs...)
-}
-
-func (c *compaction) restore() {
- c.gpi = c.snapGPI
- c.seenKey = c.snapSeenKey
- c.gpOverlappedBytes = c.snapGPOverlappedBytes
- c.tPtrs = append(c.tPtrs[:0], c.snapTPtrs...)
-}
-
-func (c *compaction) release() {
- if !c.released {
- c.released = true
- c.v.release()
- }
-}
-
-// Expand compacted tables; need external synchronization.
-func (c *compaction) expand() {
- limit := uint64(c.s.o.GetCompactionExpandLimit(c.level))
- vt0, vt1 := c.v.tables[c.level], c.v.tables[c.level+1]
-
- t0, t1 := c.tables[0], c.tables[1]
- imin, imax := t0.getRange(c.s.icmp)
- // We expand t0 here just incase ukey hop across tables.
- t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.level == 0)
- if len(t0) != len(c.tables[0]) {
- imin, imax = t0.getRange(c.s.icmp)
- }
- t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false)
- // Get entire range covered by compaction.
- amin, amax := append(t0, t1...).getRange(c.s.icmp)
-
- // See if we can grow the number of inputs in "level" without
- // changing the number of "level+1" files we pick up.
- if len(t1) > 0 {
- exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), c.level == 0)
- if len(exp0) > len(t0) && t1.size()+exp0.size() < limit {
- xmin, xmax := exp0.getRange(c.s.icmp)
- exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false)
- if len(exp1) == len(t1) {
- c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)",
- c.level, c.level+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())),
- len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size())))
- imin, imax = xmin, xmax
- t0, t1 = exp0, exp1
- amin, amax = append(t0, t1...).getRange(c.s.icmp)
- }
- }
- }
-
- // Compute the set of grandparent files that overlap this compaction
- // (parent == level+1; grandparent == level+2)
- if c.level+2 < c.s.o.GetNumLevel() {
- c.gp = c.v.tables[c.level+2].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false)
- }
-
- c.tables[0], c.tables[1] = t0, t1
- c.imin, c.imax = imin, imax
-}
-
-// Check whether compaction is trivial.
-func (c *compaction) trivial() bool {
- return len(c.tables[0]) == 1 && len(c.tables[1]) == 0 && c.gp.size() <= c.maxGPOverlaps
-}
-
-func (c *compaction) baseLevelForKey(ukey []byte) bool {
- for level, tables := range c.v.tables[c.level+2:] {
- for c.tPtrs[level] < len(tables) {
- t := tables[c.tPtrs[level]]
- if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 {
- // We've advanced far enough.
- if c.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 {
- // Key falls in this file's range, so definitely not base level.
- return false
- }
- break
- }
- c.tPtrs[level]++
- }
- }
- return true
-}
-
-func (c *compaction) shouldStopBefore(ikey iKey) bool {
- for ; c.gpi < len(c.gp); c.gpi++ {
- gp := c.gp[c.gpi]
- if c.s.icmp.Compare(ikey, gp.imax) <= 0 {
- break
- }
- if c.seenKey {
- c.gpOverlappedBytes += gp.size
- }
- }
- c.seenKey = true
-
- if c.gpOverlappedBytes > c.maxGPOverlaps {
- // Too much overlap for current output; start new output.
- c.gpOverlappedBytes = 0
- return true
- }
- return false
-}
-
-// Creates an iterator.
-func (c *compaction) newIterator() iterator.Iterator {
- // Creates iterator slice.
- icap := len(c.tables)
- if c.level == 0 {
- // Special case for level-0
- icap = len(c.tables[0]) + 1
- }
- its := make([]iterator.Iterator, 0, icap)
-
- // Options.
- ro := &opt.ReadOptions{
- DontFillCache: true,
- Strict: opt.StrictOverride,
- }
- strict := c.s.o.GetStrict(opt.StrictCompaction)
- if strict {
- ro.Strict |= opt.StrictReader
- }
-
- for i, tables := range c.tables {
- if len(tables) == 0 {
- continue
- }
-
- // Level-0 is not sorted and may overlaps each other.
- if c.level+i == 0 {
- for _, t := range tables {
- its = append(its, c.s.tops.newIterator(t, nil, ro))
- }
- } else {
- it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict)
- its = append(its, it)
- }
- }
-
- return iterator.NewMergedIterator(its, c.s.icmp, strict)
-}
diff --git a/leveldb/session_compaction.go b/leveldb/session_compaction.go
new file mode 100644
index 0000000..7c5a794
--- /dev/null
+++ b/leveldb/session_compaction.go
@@ -0,0 +1,287 @@
+// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
+// All rights reserved.
+//
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package leveldb
+
+import (
+ "sync/atomic"
+
+ "github.com/syndtr/goleveldb/leveldb/iterator"
+ "github.com/syndtr/goleveldb/leveldb/memdb"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+)
+
+func (s *session) pickMemdbLevel(umin, umax []byte) int {
+ v := s.version()
+ defer v.release()
+ return v.pickMemdbLevel(umin, umax)
+}
+
+func (s *session) flushMemdb(rec *sessionRecord, mdb *memdb.DB, level int) (level_ int, err error) {
+ // Create sorted table.
+ iter := mdb.NewIterator(nil)
+ defer iter.Release()
+ t, n, err := s.tops.createFrom(iter)
+ if err != nil {
+ return level, err
+ }
+
+ // Pick level and add to record.
+ if level < 0 {
+ level = s.pickMemdbLevel(t.imin.ukey(), t.imax.ukey())
+ }
+ rec.addTableFile(level, t)
+
+ s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", level, t.file.Num(), n, shortenb(int(t.size)), t.imin, t.imax)
+ return level, nil
+}
+
+// Pick a compaction based on current state; need external synchronization.
+func (s *session) pickCompaction() *compaction {
+ v := s.version()
+
+ var level int
+ var t0 tFiles
+ if v.cScore >= 1 {
+ level = v.cLevel
+ cptr := s.stCompPtrs[level]
+ tables := v.tables[level]
+ for _, t := range tables {
+ if cptr == nil || s.icmp.Compare(t.imax, cptr) > 0 {
+ t0 = append(t0, t)
+ break
+ }
+ }
+ if len(t0) == 0 {
+ t0 = append(t0, tables[0])
+ }
+ } else {
+ if p := atomic.LoadPointer(&v.cSeek); p != nil {
+ ts := (*tSet)(p)
+ level = ts.level
+ t0 = append(t0, ts.table)
+ } else {
+ v.release()
+ return nil
+ }
+ }
+
+ return newCompaction(s, v, level, t0)
+}
+
+// Create compaction from given level and range; need external synchronization.
+func (s *session) getCompactionRange(level int, umin, umax []byte) *compaction {
+ v := s.version()
+
+ t0 := v.tables[level].getOverlaps(nil, s.icmp, umin, umax, level == 0)
+ if len(t0) == 0 {
+ v.release()
+ return nil
+ }
+
+ // Avoid compacting too much in one shot in case the range is large.
+ // But we cannot do this for level-0 since level-0 files can overlap
+ // and we must not pick one file and drop another older file if the
+ // two files overlap.
+ if level > 0 {
+ limit := uint64(v.s.o.GetCompactionSourceLimit(level))
+ total := uint64(0)
+ for i, t := range t0 {
+ total += t.size
+ if total >= limit {
+ s.logf("table@compaction limiting F·%d -> F·%d", len(t0), i+1)
+ t0 = t0[:i+1]
+ break
+ }
+ }
+ }
+
+ return newCompaction(s, v, level, t0)
+}
+
+func newCompaction(s *session, v *version, level int, t0 tFiles) *compaction {
+ c := &compaction{
+ s: s,
+ v: v,
+ level: level,
+ tables: [2]tFiles{t0, nil},
+ maxGPOverlaps: uint64(s.o.GetCompactionGPOverlaps(level)),
+ tPtrs: make([]int, s.o.GetNumLevel()),
+ }
+ c.expand()
+ c.save()
+ return c
+}
+
+// compaction represent a compaction state.
+type compaction struct {
+ s *session
+ v *version
+
+ level int
+ tables [2]tFiles
+ maxGPOverlaps uint64
+
+ gp tFiles
+ gpi int
+ seenKey bool
+ gpOverlappedBytes uint64
+ imin, imax iKey
+ tPtrs []int
+ released bool
+
+ snapGPI int
+ snapSeenKey bool
+ snapGPOverlappedBytes uint64
+ snapTPtrs []int
+}
+
+func (c *compaction) save() {
+ c.snapGPI = c.gpi
+ c.snapSeenKey = c.seenKey
+ c.snapGPOverlappedBytes = c.gpOverlappedBytes
+ c.snapTPtrs = append(c.snapTPtrs[:0], c.tPtrs...)
+}
+
+func (c *compaction) restore() {
+ c.gpi = c.snapGPI
+ c.seenKey = c.snapSeenKey
+ c.gpOverlappedBytes = c.snapGPOverlappedBytes
+ c.tPtrs = append(c.tPtrs[:0], c.snapTPtrs...)
+}
+
+func (c *compaction) release() {
+ if !c.released {
+ c.released = true
+ c.v.release()
+ }
+}
+
+// Expand compacted tables; need external synchronization.
+func (c *compaction) expand() {
+ limit := uint64(c.s.o.GetCompactionExpandLimit(c.level))
+ vt0, vt1 := c.v.tables[c.level], c.v.tables[c.level+1]
+
+ t0, t1 := c.tables[0], c.tables[1]
+ imin, imax := t0.getRange(c.s.icmp)
+ // We expand t0 here just incase ukey hop across tables.
+ t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.level == 0)
+ if len(t0) != len(c.tables[0]) {
+ imin, imax = t0.getRange(c.s.icmp)
+ }
+ t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false)
+ // Get entire range covered by compaction.
+ amin, amax := append(t0, t1...).getRange(c.s.icmp)
+
+ // See if we can grow the number of inputs in "level" without
+ // changing the number of "level+1" files we pick up.
+ if len(t1) > 0 {
+ exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), c.level == 0)
+ if len(exp0) > len(t0) && t1.size()+exp0.size() < limit {
+ xmin, xmax := exp0.getRange(c.s.icmp)
+ exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false)
+ if len(exp1) == len(t1) {
+ c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)",
+ c.level, c.level+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())),
+ len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size())))
+ imin, imax = xmin, xmax
+ t0, t1 = exp0, exp1
+ amin, amax = append(t0, t1...).getRange(c.s.icmp)
+ }
+ }
+ }
+
+ // Compute the set of grandparent files that overlap this compaction
+ // (parent == level+1; grandparent == level+2)
+ if c.level+2 < c.s.o.GetNumLevel() {
+ c.gp = c.v.tables[c.level+2].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false)
+ }
+
+ c.tables[0], c.tables[1] = t0, t1
+ c.imin, c.imax = imin, imax
+}
+
+// Check whether compaction is trivial.
+func (c *compaction) trivial() bool {
+ return len(c.tables[0]) == 1 && len(c.tables[1]) == 0 && c.gp.size() <= c.maxGPOverlaps
+}
+
+func (c *compaction) baseLevelForKey(ukey []byte) bool {
+ for level, tables := range c.v.tables[c.level+2:] {
+ for c.tPtrs[level] < len(tables) {
+ t := tables[c.tPtrs[level]]
+ if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 {
+ // We've advanced far enough.
+ if c.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 {
+ // Key falls in this file's range, so definitely not base level.
+ return false
+ }
+ break
+ }
+ c.tPtrs[level]++
+ }
+ }
+ return true
+}
+
+func (c *compaction) shouldStopBefore(ikey iKey) bool {
+ for ; c.gpi < len(c.gp); c.gpi++ {
+ gp := c.gp[c.gpi]
+ if c.s.icmp.Compare(ikey, gp.imax) <= 0 {
+ break
+ }
+ if c.seenKey {
+ c.gpOverlappedBytes += gp.size
+ }
+ }
+ c.seenKey = true
+
+ if c.gpOverlappedBytes > c.maxGPOverlaps {
+ // Too much overlap for current output; start new output.
+ c.gpOverlappedBytes = 0
+ return true
+ }
+ return false
+}
+
+// Creates an iterator.
+func (c *compaction) newIterator() iterator.Iterator {
+ // Creates iterator slice.
+ icap := len(c.tables)
+ if c.level == 0 {
+ // Special case for level-0.
+ icap = len(c.tables[0]) + 1
+ }
+ its := make([]iterator.Iterator, 0, icap)
+
+ // Options.
+ ro := &opt.ReadOptions{
+ DontFillCache: true,
+ Strict: opt.StrictOverride,
+ }
+ strict := c.s.o.GetStrict(opt.StrictCompaction)
+ if strict {
+ ro.Strict |= opt.StrictReader
+ }
+
+ for i, tables := range c.tables {
+ if len(tables) == 0 {
+ continue
+ }
+
+ // Level-0 is not sorted and may overlaps each other.
+ if c.level+i == 0 {
+ for _, t := range tables {
+ its = append(its, c.s.tops.newIterator(t, nil, ro))
+ }
+ } else {
+ it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict)
+ its = append(its, it)
+ }
+ }
+
+ return iterator.NewMergedIterator(its, c.s.icmp, strict)
+}
diff --git a/leveldb/session_record.go b/leveldb/session_record.go
index 1bdcc68..405e07b 100644
--- a/leveldb/session_record.go
+++ b/leveldb/session_record.go
@@ -52,8 +52,6 @@
}
type sessionRecord struct {
- numLevel int
-
hasRec int
comparer string
journalNum uint64
@@ -230,7 +228,7 @@
return x
}
-func (p *sessionRecord) readLevel(field string, r io.ByteReader) int {
+func (p *sessionRecord) readLevel(field string, r io.ByteReader, numLevel int) int {
if p.err != nil {
return 0
}
@@ -238,14 +236,14 @@
if p.err != nil {
return 0
}
- if x >= uint64(p.numLevel) {
+ if x >= uint64(numLevel) {
p.err = errors.NewErrCorrupted(nil, &ErrManifestCorrupted{field, "invalid level number"})
return 0
}
return int(x)
}
-func (p *sessionRecord) decode(r io.Reader) error {
+func (p *sessionRecord) decode(r io.Reader, numLevel int) error {
br, ok := r.(byteReader)
if !ok {
br = bufio.NewReader(r)
@@ -286,13 +284,13 @@
p.setSeqNum(x)
}
case recCompPtr:
- level := p.readLevel("comp-ptr.level", br)
+ level := p.readLevel("comp-ptr.level", br, numLevel)
ikey := p.readBytes("comp-ptr.ikey", br)
if p.err == nil {
p.addCompPtr(level, iKey(ikey))
}
case recAddTable:
- level := p.readLevel("add-table.level", br)
+ level := p.readLevel("add-table.level", br, numLevel)
num := p.readUvarint("add-table.num", br)
size := p.readUvarint("add-table.size", br)
imin := p.readBytes("add-table.imin", br)
@@ -301,7 +299,7 @@
p.addTable(level, num, size, imin, imax)
}
case recDelTable:
- level := p.readLevel("del-table.level", br)
+ level := p.readLevel("del-table.level", br, numLevel)
num := p.readUvarint("del-table.num", br)
if p.err == nil {
p.delTable(level, num)
diff --git a/leveldb/session_record_test.go b/leveldb/session_record_test.go
index c0c035a..33c1487 100644
--- a/leveldb/session_record_test.go
+++ b/leveldb/session_record_test.go
@@ -19,8 +19,8 @@
if err != nil {
return
}
- v2 := &sessionRecord{numLevel: opt.DefaultNumLevel}
- err = v.decode(b)
+ v2 := &sessionRecord{}
+ err = v.decode(b, opt.DefaultNumLevel)
if err != nil {
return
}
@@ -34,7 +34,7 @@
func TestSessionRecord_EncodeDecode(t *testing.T) {
big := uint64(1) << 50
- v := &sessionRecord{numLevel: opt.DefaultNumLevel}
+ v := &sessionRecord{}
i := uint64(0)
test := func() {
res, err := decodeEncode(v)
diff --git a/leveldb/session_util.go b/leveldb/session_util.go
index 007c02c..399a788 100644
--- a/leveldb/session_util.go
+++ b/leveldb/session_util.go
@@ -182,7 +182,7 @@
defer v.release()
}
if rec == nil {
- rec = &sessionRecord{numLevel: s.o.GetNumLevel()}
+ rec = &sessionRecord{}
}
s.fillRecord(rec, true)
v.fillRecord(rec)
diff --git a/leveldb/version.go b/leveldb/version.go
index 88a52f5..011d982 100644
--- a/leveldb/version.go
+++ b/leveldb/version.go
@@ -300,7 +300,7 @@
return
}
-func (v *version) pickLevel(umin, umax []byte) (level int) {
+func (v *version) pickMemdbLevel(umin, umax []byte) (level int) {
if !v.tables[0].overlaps(v.s.icmp, umin, umax, true) {
var overlaps tFiles
maxLevel := v.s.o.GetMaxMemCompationLevel()