| // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> |
| // All rights reserved. |
| // |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| package leveldb |
| |
| import ( |
| "sync" |
| "time" |
| |
| "github.com/syndtr/goleveldb/leveldb/errors" |
| "github.com/syndtr/goleveldb/leveldb/opt" |
| "github.com/syndtr/goleveldb/leveldb/storage" |
| ) |
| |
| var ( |
| errCompactionTransactExiting = errors.New("leveldb: compaction transact exiting") |
| ) |
| |
| type cStat struct { |
| duration time.Duration |
| read int64 |
| write int64 |
| } |
| |
| func (p *cStat) add(n *cStatStaging) { |
| p.duration += n.duration |
| p.read += n.read |
| p.write += n.write |
| } |
| |
| func (p *cStat) get() (duration time.Duration, read, write int64) { |
| return p.duration, p.read, p.write |
| } |
| |
| type cStatStaging struct { |
| start time.Time |
| duration time.Duration |
| on bool |
| read int64 |
| write int64 |
| } |
| |
| func (p *cStatStaging) startTimer() { |
| if !p.on { |
| p.start = time.Now() |
| p.on = true |
| } |
| } |
| |
| func (p *cStatStaging) stopTimer() { |
| if p.on { |
| p.duration += time.Since(p.start) |
| p.on = false |
| } |
| } |
| |
| type cStats struct { |
| lk sync.Mutex |
| stats []cStat |
| } |
| |
| func (p *cStats) addStat(level int, n *cStatStaging) { |
| p.lk.Lock() |
| if level >= len(p.stats) { |
| newStats := make([]cStat, level+1) |
| copy(newStats, p.stats) |
| p.stats = newStats |
| } |
| p.stats[level].add(n) |
| p.lk.Unlock() |
| } |
| |
| func (p *cStats) getStat(level int) (duration time.Duration, read, write int64) { |
| p.lk.Lock() |
| defer p.lk.Unlock() |
| if level < len(p.stats) { |
| return p.stats[level].get() |
| } |
| return |
| } |
| |
| func (db *DB) compactionError() { |
| var err error |
| noerr: |
| // No error. |
| for { |
| select { |
| case err = <-db.compErrSetC: |
| switch { |
| case err == nil: |
| case err == ErrReadOnly, errors.IsCorrupted(err): |
| goto hasperr |
| default: |
| goto haserr |
| } |
| case _, _ = <-db.closeC: |
| return |
| } |
| } |
| haserr: |
| // Transient error. |
| for { |
| select { |
| case db.compErrC <- err: |
| case err = <-db.compErrSetC: |
| switch { |
| case err == nil: |
| goto noerr |
| case err == ErrReadOnly, errors.IsCorrupted(err): |
| goto hasperr |
| default: |
| } |
| case _, _ = <-db.closeC: |
| return |
| } |
| } |
| hasperr: |
| // Persistent error. |
| for { |
| select { |
| case db.compErrC <- err: |
| case db.compPerErrC <- err: |
| case db.writeLockC <- struct{}{}: |
| // Hold write lock, so that write won't pass-through. |
| db.compWriteLocking = true |
| case _, _ = <-db.closeC: |
| if db.compWriteLocking { |
| // We should release the lock or Close will hang. |
| <-db.writeLockC |
| } |
| return |
| } |
| } |
| } |
| |
| type compactionTransactCounter int |
| |
| func (cnt *compactionTransactCounter) incr() { |
| *cnt++ |
| } |
| |
| type compactionTransactInterface interface { |
| run(cnt *compactionTransactCounter) error |
| revert() error |
| } |
| |
| func (db *DB) compactionTransact(name string, t compactionTransactInterface) { |
| defer func() { |
| if x := recover(); x != nil { |
| if x == errCompactionTransactExiting { |
| if err := t.revert(); err != nil { |
| db.logf("%s revert error %q", name, err) |
| } |
| } |
| panic(x) |
| } |
| }() |
| |
| const ( |
| backoffMin = 1 * time.Second |
| backoffMax = 8 * time.Second |
| backoffMul = 2 * time.Second |
| ) |
| var ( |
| backoff = backoffMin |
| backoffT = time.NewTimer(backoff) |
| lastCnt = compactionTransactCounter(0) |
| |
| disableBackoff = db.s.o.GetDisableCompactionBackoff() |
| ) |
| for n := 0; ; n++ { |
| // Check whether the DB is closed. |
| if db.isClosed() { |
| db.logf("%s exiting", name) |
| db.compactionExitTransact() |
| } else if n > 0 { |
| db.logf("%s retrying N·%d", name, n) |
| } |
| |
| // Execute. |
| cnt := compactionTransactCounter(0) |
| err := t.run(&cnt) |
| if err != nil { |
| db.logf("%s error I·%d %q", name, cnt, err) |
| } |
| |
| // Set compaction error status. |
| select { |
| case db.compErrSetC <- err: |
| case perr := <-db.compPerErrC: |
| if err != nil { |
| db.logf("%s exiting (persistent error %q)", name, perr) |
| db.compactionExitTransact() |
| } |
| case _, _ = <-db.closeC: |
| db.logf("%s exiting", name) |
| db.compactionExitTransact() |
| } |
| if err == nil { |
| return |
| } |
| if errors.IsCorrupted(err) { |
| db.logf("%s exiting (corruption detected)", name) |
| db.compactionExitTransact() |
| } |
| |
| if !disableBackoff { |
| // Reset backoff duration if counter is advancing. |
| if cnt > lastCnt { |
| backoff = backoffMin |
| lastCnt = cnt |
| } |
| |
| // Backoff. |
| backoffT.Reset(backoff) |
| if backoff < backoffMax { |
| backoff *= backoffMul |
| if backoff > backoffMax { |
| backoff = backoffMax |
| } |
| } |
| select { |
| case <-backoffT.C: |
| case _, _ = <-db.closeC: |
| db.logf("%s exiting", name) |
| db.compactionExitTransact() |
| } |
| } |
| } |
| } |
| |
| type compactionTransactFunc struct { |
| runFunc func(cnt *compactionTransactCounter) error |
| revertFunc func() error |
| } |
| |
| func (t *compactionTransactFunc) run(cnt *compactionTransactCounter) error { |
| return t.runFunc(cnt) |
| } |
| |
| func (t *compactionTransactFunc) revert() error { |
| if t.revertFunc != nil { |
| return t.revertFunc() |
| } |
| return nil |
| } |
| |
| func (db *DB) compactionTransactFunc(name string, run func(cnt *compactionTransactCounter) error, revert func() error) { |
| db.compactionTransact(name, &compactionTransactFunc{run, revert}) |
| } |
| |
| func (db *DB) compactionExitTransact() { |
| panic(errCompactionTransactExiting) |
| } |
| |
| func (db *DB) compactionCommit(name string, rec *sessionRecord) { |
| db.compCommitLk.Lock() |
| defer db.compCommitLk.Unlock() // Defer is necessary. |
| db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error { |
| return db.s.commit(rec) |
| }, nil) |
| } |
| |
| func (db *DB) memCompaction() { |
| mdb := db.getFrozenMem() |
| if mdb == nil { |
| return |
| } |
| defer mdb.decref() |
| |
| db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(mdb.Size())) |
| |
| // Don't compact empty memdb. |
| if mdb.Len() == 0 { |
| db.logf("memdb@flush skipping") |
| // drop frozen memdb |
| db.dropFrozenMem() |
| return |
| } |
| |
| // Pause table compaction. |
| resumeC := make(chan struct{}) |
| select { |
| case db.tcompPauseC <- (chan<- struct{})(resumeC): |
| case <-db.compPerErrC: |
| close(resumeC) |
| resumeC = nil |
| case _, _ = <-db.closeC: |
| return |
| } |
| |
| var ( |
| rec = &sessionRecord{} |
| stats = &cStatStaging{} |
| flushLevel int |
| ) |
| |
| // Generate tables. |
| db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) { |
| stats.startTimer() |
| flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel) |
| stats.stopTimer() |
| return |
| }, func() error { |
| for _, r := range rec.addedTables { |
| db.logf("memdb@flush revert @%d", r.num) |
| if err := db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: r.num}); err != nil { |
| return err |
| } |
| } |
| return nil |
| }) |
| |
| rec.setJournalNum(db.journalFd.Num) |
| rec.setSeqNum(db.frozenSeq) |
| |
| // Commit. |
| stats.startTimer() |
| db.compactionCommit("memdb", rec) |
| stats.stopTimer() |
| |
| db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration) |
| |
| for _, r := range rec.addedTables { |
| stats.write += r.size |
| } |
| db.compStats.addStat(flushLevel, stats) |
| |
| // Drop frozen memdb. |
| db.dropFrozenMem() |
| |
| // Resume table compaction. |
| if resumeC != nil { |
| select { |
| case <-resumeC: |
| close(resumeC) |
| case _, _ = <-db.closeC: |
| return |
| } |
| } |
| |
| // Trigger table compaction. |
| db.compTrigger(db.tcompCmdC) |
| } |
| |
| type tableCompactionBuilder struct { |
| db *DB |
| s *session |
| c *compaction |
| rec *sessionRecord |
| stat0, stat1 *cStatStaging |
| |
| snapHasLastUkey bool |
| snapLastUkey []byte |
| snapLastSeq uint64 |
| snapIter int |
| snapKerrCnt int |
| snapDropCnt int |
| |
| kerrCnt int |
| dropCnt int |
| |
| minSeq uint64 |
| strict bool |
| tableSize int |
| |
| tw *tWriter |
| } |
| |
| func (b *tableCompactionBuilder) appendKV(key, value []byte) error { |
| // Create new table if not already. |
| if b.tw == nil { |
| // Check for pause event. |
| if b.db != nil { |
| select { |
| case ch := <-b.db.tcompPauseC: |
| b.db.pauseCompaction(ch) |
| case _, _ = <-b.db.closeC: |
| b.db.compactionExitTransact() |
| default: |
| } |
| } |
| |
| // Create new table. |
| var err error |
| b.tw, err = b.s.tops.create() |
| if err != nil { |
| return err |
| } |
| } |
| |
| // Write key/value into table. |
| return b.tw.append(key, value) |
| } |
| |
| func (b *tableCompactionBuilder) needFlush() bool { |
| return b.tw.tw.BytesLen() >= b.tableSize |
| } |
| |
| func (b *tableCompactionBuilder) flush() error { |
| t, err := b.tw.finish() |
| if err != nil { |
| return err |
| } |
| b.rec.addTableFile(b.c.sourceLevel+1, t) |
| b.stat1.write += t.size |
| b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, t.fd.Num, b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax) |
| b.tw = nil |
| return nil |
| } |
| |
| func (b *tableCompactionBuilder) cleanup() { |
| if b.tw != nil { |
| b.tw.drop() |
| b.tw = nil |
| } |
| } |
| |
| func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) error { |
| snapResumed := b.snapIter > 0 |
| hasLastUkey := b.snapHasLastUkey // The key might has zero length, so this is necessary. |
| lastUkey := append([]byte{}, b.snapLastUkey...) |
| lastSeq := b.snapLastSeq |
| b.kerrCnt = b.snapKerrCnt |
| b.dropCnt = b.snapDropCnt |
| // Restore compaction state. |
| b.c.restore() |
| |
| defer b.cleanup() |
| |
| b.stat1.startTimer() |
| defer b.stat1.stopTimer() |
| |
| iter := b.c.newIterator() |
| defer iter.Release() |
| for i := 0; iter.Next(); i++ { |
| // Incr transact counter. |
| cnt.incr() |
| |
| // Skip until last state. |
| if i < b.snapIter { |
| continue |
| } |
| |
| resumed := false |
| if snapResumed { |
| resumed = true |
| snapResumed = false |
| } |
| |
| ikey := iter.Key() |
| ukey, seq, kt, kerr := parseInternalKey(ikey) |
| |
| if kerr == nil { |
| shouldStop := !resumed && b.c.shouldStopBefore(ikey) |
| |
| if !hasLastUkey || b.s.icmp.uCompare(lastUkey, ukey) != 0 { |
| // First occurrence of this user key. |
| |
| // Only rotate tables if ukey doesn't hop across. |
| if b.tw != nil && (shouldStop || b.needFlush()) { |
| if err := b.flush(); err != nil { |
| return err |
| } |
| |
| // Creates snapshot of the state. |
| b.c.save() |
| b.snapHasLastUkey = hasLastUkey |
| b.snapLastUkey = append(b.snapLastUkey[:0], lastUkey...) |
| b.snapLastSeq = lastSeq |
| b.snapIter = i |
| b.snapKerrCnt = b.kerrCnt |
| b.snapDropCnt = b.dropCnt |
| } |
| |
| hasLastUkey = true |
| lastUkey = append(lastUkey[:0], ukey...) |
| lastSeq = keyMaxSeq |
| } |
| |
| switch { |
| case lastSeq <= b.minSeq: |
| // Dropped because newer entry for same user key exist |
| fallthrough // (A) |
| case kt == keyTypeDel && seq <= b.minSeq && b.c.baseLevelForKey(lastUkey): |
| // For this user key: |
| // (1) there is no data in higher levels |
| // (2) data in lower levels will have larger seq numbers |
| // (3) data in layers that are being compacted here and have |
| // smaller seq numbers will be dropped in the next |
| // few iterations of this loop (by rule (A) above). |
| // Therefore this deletion marker is obsolete and can be dropped. |
| lastSeq = seq |
| b.dropCnt++ |
| continue |
| default: |
| lastSeq = seq |
| } |
| } else { |
| if b.strict { |
| return kerr |
| } |
| |
| // Don't drop corrupted keys. |
| hasLastUkey = false |
| lastUkey = lastUkey[:0] |
| lastSeq = keyMaxSeq |
| b.kerrCnt++ |
| } |
| |
| if err := b.appendKV(ikey, iter.Value()); err != nil { |
| return err |
| } |
| } |
| |
| if err := iter.Error(); err != nil { |
| return err |
| } |
| |
| // Finish last table. |
| if b.tw != nil && !b.tw.empty() { |
| return b.flush() |
| } |
| return nil |
| } |
| |
| func (b *tableCompactionBuilder) revert() error { |
| for _, at := range b.rec.addedTables { |
| b.s.logf("table@build revert @%d", at.num) |
| if err := b.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: at.num}); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| func (db *DB) tableCompaction(c *compaction, noTrivial bool) { |
| defer c.release() |
| |
| rec := &sessionRecord{} |
| rec.addCompPtr(c.sourceLevel, c.imax) |
| |
| if !noTrivial && c.trivial() { |
| t := c.levels[0][0] |
| db.logf("table@move L%d@%d -> L%d", c.sourceLevel, t.fd.Num, c.sourceLevel+1) |
| rec.delTable(c.sourceLevel, t.fd.Num) |
| rec.addTableFile(c.sourceLevel+1, t) |
| db.compactionCommit("table-move", rec) |
| return |
| } |
| |
| var stats [2]cStatStaging |
| for i, tables := range c.levels { |
| for _, t := range tables { |
| stats[i].read += t.size |
| // Insert deleted tables into record |
| rec.delTable(c.sourceLevel+i, t.fd.Num) |
| } |
| } |
| sourceSize := int(stats[0].read + stats[1].read) |
| minSeq := db.minSeq() |
| db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.sourceLevel, len(c.levels[0]), c.sourceLevel+1, len(c.levels[1]), shortenb(sourceSize), minSeq) |
| |
| b := &tableCompactionBuilder{ |
| db: db, |
| s: db.s, |
| c: c, |
| rec: rec, |
| stat1: &stats[1], |
| minSeq: minSeq, |
| strict: db.s.o.GetStrict(opt.StrictCompaction), |
| tableSize: db.s.o.GetCompactionTableSize(c.sourceLevel + 1), |
| } |
| db.compactionTransact("table@build", b) |
| |
| // Commit. |
| stats[1].startTimer() |
| db.compactionCommit("table", rec) |
| stats[1].stopTimer() |
| |
| resultSize := int(stats[1].write) |
| db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration) |
| |
| // Save compaction stats |
| for i := range stats { |
| db.compStats.addStat(c.sourceLevel+1, &stats[i]) |
| } |
| } |
| |
| func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error { |
| db.logf("table@compaction range L%d %q:%q", level, umin, umax) |
| if level >= 0 { |
| if c := db.s.getCompactionRange(level, umin, umax, true); c != nil { |
| db.tableCompaction(c, true) |
| } |
| } else { |
| // Retry until nothing to compact. |
| for { |
| compacted := false |
| |
| // Scan for maximum level with overlapped tables. |
| v := db.s.version() |
| m := 1 |
| for i := m; i < len(v.levels); i++ { |
| tables := v.levels[i] |
| if tables.overlaps(db.s.icmp, umin, umax, false) { |
| m = i |
| } |
| } |
| v.release() |
| |
| for level := 0; level < m; level++ { |
| if c := db.s.getCompactionRange(level, umin, umax, false); c != nil { |
| db.tableCompaction(c, true) |
| compacted = true |
| } |
| } |
| |
| if !compacted { |
| break |
| } |
| } |
| } |
| |
| return nil |
| } |
| |
| func (db *DB) tableAutoCompaction() { |
| if c := db.s.pickCompaction(); c != nil { |
| db.tableCompaction(c, false) |
| } |
| } |
| |
| func (db *DB) tableNeedCompaction() bool { |
| v := db.s.version() |
| defer v.release() |
| return v.needCompaction() |
| } |
| |
| func (db *DB) pauseCompaction(ch chan<- struct{}) { |
| select { |
| case ch <- struct{}{}: |
| case _, _ = <-db.closeC: |
| db.compactionExitTransact() |
| } |
| } |
| |
| type cCmd interface { |
| ack(err error) |
| } |
| |
| type cAuto struct { |
| ackC chan<- error |
| } |
| |
| func (r cAuto) ack(err error) { |
| if r.ackC != nil { |
| defer func() { |
| recover() |
| }() |
| r.ackC <- err |
| } |
| } |
| |
| type cRange struct { |
| level int |
| min, max []byte |
| ackC chan<- error |
| } |
| |
| func (r cRange) ack(err error) { |
| if r.ackC != nil { |
| defer func() { |
| recover() |
| }() |
| r.ackC <- err |
| } |
| } |
| |
| // This will trigger auto compaction but will not wait for it. |
| func (db *DB) compTrigger(compC chan<- cCmd) { |
| select { |
| case compC <- cAuto{}: |
| default: |
| } |
| } |
| |
| // This will trigger auto compaction and/or wait for all compaction to be done. |
| func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) { |
| ch := make(chan error) |
| defer close(ch) |
| // Send cmd. |
| select { |
| case compC <- cAuto{ch}: |
| case err = <-db.compErrC: |
| return |
| case _, _ = <-db.closeC: |
| return ErrClosed |
| } |
| // Wait cmd. |
| select { |
| case err = <-ch: |
| case err = <-db.compErrC: |
| case _, _ = <-db.closeC: |
| return ErrClosed |
| } |
| return err |
| } |
| |
| // Send range compaction request. |
| func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (err error) { |
| ch := make(chan error) |
| defer close(ch) |
| // Send cmd. |
| select { |
| case compC <- cRange{level, min, max, ch}: |
| case err := <-db.compErrC: |
| return err |
| case _, _ = <-db.closeC: |
| return ErrClosed |
| } |
| // Wait cmd. |
| select { |
| case err = <-ch: |
| case err = <-db.compErrC: |
| case _, _ = <-db.closeC: |
| return ErrClosed |
| } |
| return err |
| } |
| |
| func (db *DB) mCompaction() { |
| var x cCmd |
| |
| defer func() { |
| if x := recover(); x != nil { |
| if x != errCompactionTransactExiting { |
| panic(x) |
| } |
| } |
| if x != nil { |
| x.ack(ErrClosed) |
| } |
| db.closeW.Done() |
| }() |
| |
| for { |
| select { |
| case x = <-db.mcompCmdC: |
| switch x.(type) { |
| case cAuto: |
| db.memCompaction() |
| x.ack(nil) |
| x = nil |
| default: |
| panic("leveldb: unknown command") |
| } |
| case _, _ = <-db.closeC: |
| return |
| } |
| } |
| } |
| |
| func (db *DB) tCompaction() { |
| var x cCmd |
| var ackQ []cCmd |
| |
| defer func() { |
| if x := recover(); x != nil { |
| if x != errCompactionTransactExiting { |
| panic(x) |
| } |
| } |
| for i := range ackQ { |
| ackQ[i].ack(ErrClosed) |
| ackQ[i] = nil |
| } |
| if x != nil { |
| x.ack(ErrClosed) |
| } |
| db.closeW.Done() |
| }() |
| |
| for { |
| if db.tableNeedCompaction() { |
| select { |
| case x = <-db.tcompCmdC: |
| case ch := <-db.tcompPauseC: |
| db.pauseCompaction(ch) |
| continue |
| case _, _ = <-db.closeC: |
| return |
| default: |
| } |
| } else { |
| for i := range ackQ { |
| ackQ[i].ack(nil) |
| ackQ[i] = nil |
| } |
| ackQ = ackQ[:0] |
| select { |
| case x = <-db.tcompCmdC: |
| case ch := <-db.tcompPauseC: |
| db.pauseCompaction(ch) |
| continue |
| case _, _ = <-db.closeC: |
| return |
| } |
| } |
| if x != nil { |
| switch cmd := x.(type) { |
| case cAuto: |
| ackQ = append(ackQ, x) |
| case cRange: |
| x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max)) |
| default: |
| panic("leveldb: unknown command") |
| } |
| x = nil |
| } |
| db.tableAutoCompaction() |
| } |
| } |