leveldb: implements dynamic levels
The levels will grow and shrink as needed.
diff --git a/leveldb/db.go b/leveldb/db.go
index 0458f55..709a926 100644
--- a/leveldb/db.go
+++ b/leveldb/db.go
@@ -70,7 +70,7 @@
compPerErrC chan error
compErrSetC chan error
compWriteLocking bool
- compStats []cStats
+ compStats cStats
memdbMaxLevel int // For testing.
// Close.
@@ -105,7 +105,6 @@
compErrC: make(chan error),
compPerErrC: make(chan error),
compErrSetC: make(chan error),
- compStats: make([]cStats, s.o.GetNumLevel()),
// Close
closeC: make(chan struct{}),
}
@@ -921,7 +920,7 @@
var level uint
var rest string
n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest)
- if n != 1 || int(level) >= db.s.o.GetNumLevel() {
+ if n != 1 {
err = ErrNotFound
} else {
value = fmt.Sprint(v.tLen(int(level)))
@@ -930,8 +929,8 @@
value = "Compactions\n" +
" Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" +
"-------+------------+---------------+---------------+---------------+---------------\n"
- for level, tables := range v.tables {
- duration, read, write := db.compStats[level].get()
+ for level, tables := range v.levels {
+ duration, read, write := db.compStats.getStat(level)
if len(tables) == 0 && duration == 0 {
continue
}
@@ -940,7 +939,7 @@
float64(read)/1048576.0, float64(write)/1048576.0)
}
case p == "sstables":
- for level, tables := range v.tables {
+ for level, tables := range v.levels {
value += fmt.Sprintf("--- level %d ---\n", level)
for _, t := range tables {
value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.file.Num(), t.size, t.imin, t.imax)
diff --git a/leveldb/db_compaction.go b/leveldb/db_compaction.go
index a774c8a..065ca20 100644
--- a/leveldb/db_compaction.go
+++ b/leveldb/db_compaction.go
@@ -18,28 +18,23 @@
errCompactionTransactExiting = errors.New("leveldb: compaction transact exiting")
)
-type cStats struct {
- sync.Mutex
+type cStat struct {
duration time.Duration
read uint64
write uint64
}
-func (p *cStats) add(n *cStatsStaging) {
- p.Lock()
+func (p *cStat) add(n *cStatStaging) {
p.duration += n.duration
p.read += n.read
p.write += n.write
- p.Unlock()
}
-func (p *cStats) get() (duration time.Duration, read, write uint64) {
- p.Lock()
- defer p.Unlock()
+func (p *cStat) get() (duration time.Duration, read, write uint64) {
return p.duration, p.read, p.write
}
-type cStatsStaging struct {
+type cStatStaging struct {
start time.Time
duration time.Duration
on bool
@@ -47,20 +42,45 @@
write uint64
}
-func (p *cStatsStaging) startTimer() {
+func (p *cStatStaging) startTimer() {
if !p.on {
p.start = time.Now()
p.on = true
}
}
-func (p *cStatsStaging) stopTimer() {
+func (p *cStatStaging) stopTimer() {
if p.on {
p.duration += time.Since(p.start)
p.on = false
}
}
+type cStats struct {
+ lk sync.Mutex
+ stats []cStat
+}
+
+func (p *cStats) addStat(level int, n *cStatStaging) {
+ p.lk.Lock()
+ if level >= len(p.stats) {
+ newStats := make([]cStat, level+1)
+ copy(newStats, p.stats)
+ p.stats = newStats
+ }
+ p.stats[level].add(n)
+ p.lk.Unlock()
+}
+
+func (p *cStats) getStat(level int) (duration time.Duration, read, write uint64) {
+ p.lk.Lock()
+ defer p.lk.Unlock()
+ if level < len(p.stats) {
+ return p.stats[level].get()
+ }
+ return
+}
+
func (db *DB) compactionError() {
var err error
noerr:
@@ -265,7 +285,7 @@
var (
rec = &sessionRecord{}
- stats = &cStatsStaging{}
+ stats = &cStatStaging{}
flushLevel int
)
@@ -299,7 +319,7 @@
for _, r := range rec.addedTables {
stats.write += r.size
}
- db.compStats[flushLevel].add(stats)
+ db.compStats.addStat(flushLevel, stats)
// Drop frozen memdb.
db.dropFrozenMem()
@@ -323,7 +343,7 @@
s *session
c *compaction
rec *sessionRecord
- stat0, stat1 *cStatsStaging
+ stat0, stat1 *cStatStaging
snapHasLastUkey bool
snapLastUkey []byte
@@ -377,9 +397,9 @@
if err != nil {
return err
}
- b.rec.addTableFile(b.c.level+1, t)
+ b.rec.addTableFile(b.c.sourceLevel+1, t)
b.stat1.write += t.size
- b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.level+1, t.file.Num(), b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax)
+ b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, t.file.Num(), b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax)
b.tw = nil
return nil
}
@@ -514,30 +534,30 @@
defer c.release()
rec := &sessionRecord{}
- rec.addCompPtr(c.level, c.imax)
+ rec.addCompPtr(c.sourceLevel, c.imax)
if !noTrivial && c.trivial() {
- t := c.tables[0][0]
- db.logf("table@move L%d@%d -> L%d", c.level, t.file.Num(), c.level+1)
- rec.delTable(c.level, t.file.Num())
- rec.addTableFile(c.level+1, t)
+ t := c.levels[0][0]
+ db.logf("table@move L%d@%d -> L%d", c.sourceLevel, t.file.Num(), c.sourceLevel+1)
+ rec.delTable(c.sourceLevel, t.file.Num())
+ rec.addTableFile(c.sourceLevel+1, t)
db.compactionTransactFunc("table@move", func(cnt *compactionTransactCounter) (err error) {
return db.s.commit(rec)
}, nil)
return
}
- var stats [2]cStatsStaging
- for i, tables := range c.tables {
+ var stats [2]cStatStaging
+ for i, tables := range c.levels {
for _, t := range tables {
stats[i].read += t.size
// Insert deleted tables into record
- rec.delTable(c.level+i, t.file.Num())
+ rec.delTable(c.sourceLevel+i, t.file.Num())
}
}
sourceSize := int(stats[0].read + stats[1].read)
minSeq := db.minSeq()
- db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.level, len(c.tables[0]), c.level+1, len(c.tables[1]), shortenb(sourceSize), minSeq)
+ db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.sourceLevel, len(c.levels[0]), c.sourceLevel+1, len(c.levels[1]), shortenb(sourceSize), minSeq)
b := &tableCompactionBuilder{
db: db,
@@ -547,7 +567,7 @@
stat1: &stats[1],
minSeq: minSeq,
strict: db.s.o.GetStrict(opt.StrictCompaction),
- tableSize: db.s.o.GetCompactionTableSize(c.level + 1),
+ tableSize: db.s.o.GetCompactionTableSize(c.sourceLevel + 1),
}
db.compactionTransact("table@build", b)
@@ -563,15 +583,11 @@
// Save compaction stats
for i := range stats {
- db.compStats[c.level+1].add(&stats[i])
+ db.compStats.addStat(c.sourceLevel+1, &stats[i])
}
}
func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error {
- if level >= db.s.o.GetNumLevel() {
- return errors.New("leveldb: invalid compaction level")
- }
-
db.logf("table@compaction range L%d %q:%q", level, umin, umax)
if level >= 0 {
if c := db.s.getCompactionRange(level, umin, umax, true); c != nil {
@@ -585,9 +601,10 @@
// Scan for maximum level with overlapped tables.
v := db.s.version()
m := 1
- for i, t := range v.tables[1:] {
- if t.overlaps(db.s.icmp, umin, umax, false) {
- m = i + 1
+ for i := m; i < len(v.levels); i++ {
+ tables := v.levels[i]
+ if tables.overlaps(db.s.icmp, umin, umax, false) {
+ m = i
}
}
v.release()
diff --git a/leveldb/db_test.go b/leveldb/db_test.go
index ba37422..8ec6629 100644
--- a/leveldb/db_test.go
+++ b/leveldb/db_test.go
@@ -158,15 +158,17 @@
maxLevel int
)
v := db.s.version()
- for i, tt := range v.tables[1 : len(v.tables)-1] {
- level := i + 1
- next := v.tables[level+1]
- for _, t := range tt {
- r := next.getOverlaps(nil, db.s.icmp, t.imin.ukey(), t.imax.ukey(), false)
- sum := r.size()
- if sum > maxOverlaps {
- maxOverlaps = sum
- maxLevel = level
+ if len(v.levels) > 2 {
+ for i, tt := range v.levels[1 : len(v.levels)-1] {
+ level := i + 1
+ next := v.levels[level+1]
+ for _, t := range tt {
+ r := next.getOverlaps(nil, db.s.icmp, t.imin.ukey(), t.imax.ukey(), false)
+ sum := r.size()
+ if sum > maxOverlaps {
+ maxOverlaps = sum
+ maxLevel = level
+ }
}
}
}
@@ -435,12 +437,12 @@
res := ""
nz := 0
v := h.db.s.version()
- for level, tt := range v.tables {
+ for level, tables := range v.levels {
if level > 0 {
res += ","
}
- res += fmt.Sprint(len(tt))
- if len(tt) > 0 {
+ res += fmt.Sprint(len(tables))
+ if len(tables) > 0 {
nz = len(res)
}
}
@@ -457,8 +459,8 @@
func (h *dbHarness) totalTables() (n int) {
v := h.db.s.version()
- for _, tt := range v.tables {
- n += len(tt)
+ for _, tables := range v.levels {
+ n += len(tables)
}
v.release()
return
@@ -974,7 +976,7 @@
h := newDbHarnessWopt(t, &opt.Options{WriteBuffer: 100000})
defer h.close()
- maxTables := h.o.GetNumLevel() + h.o.GetWriteL0PauseTrigger()
+ maxTables := h.o.GetWriteL0PauseTrigger() + 7
value := strings.Repeat("v", 2*h.o.GetWriteBuffer())
for i := 0; i < 5*maxTables; i++ {
@@ -992,7 +994,7 @@
h.reopenDB()
- maxTables := h.o.GetNumLevel() + h.o.GetWriteL0PauseTrigger()
+ maxTables := h.o.GetWriteL0PauseTrigger() + 7
value := strings.Repeat("v", 2*h.o.GetWriteBuffer())
for i := 0; i < 5*maxTables; i++ {
@@ -1008,7 +1010,7 @@
h := newDbHarnessWopt(t, &opt.Options{Compression: opt.NoCompression})
defer h.close()
- h.putMulti(h.o.GetNumLevel(), "A", "Z")
+ h.putMulti(7, "A", "Z")
// Suppose there is:
// small amount of data with prefix A
@@ -2365,7 +2367,7 @@
h.compactMem()
h.waitCompaction()
- for level, tables := range h.db.s.stVersion.tables {
+ for level, tables := range h.db.s.stVersion.levels {
for _, table := range tables {
t.Logf("L%d@%d %q:%q", level, table.file.Num(), table.imin, table.imax)
}
@@ -2373,14 +2375,14 @@
h.compactRangeAt(0, "", "")
h.waitCompaction()
- for level, tables := range h.db.s.stVersion.tables {
+ for level, tables := range h.db.s.stVersion.levels {
for _, table := range tables {
t.Logf("L%d@%d %q:%q", level, table.file.Num(), table.imin, table.imax)
}
}
h.compactRangeAt(1, "", "")
h.waitCompaction()
- for level, tables := range h.db.s.stVersion.tables {
+ for level, tables := range h.db.s.stVersion.levels {
for _, table := range tables {
t.Logf("L%d@%d %q:%q", level, table.file.Num(), table.imin, table.imax)
}
@@ -2465,13 +2467,13 @@
// Build grandparent.
v := s.version()
- c := newCompaction(s, v, 1, append(tFiles{}, v.tables[1]...))
+ c := newCompaction(s, v, 1, append(tFiles{}, v.levels[1]...))
rec := &sessionRecord{}
b := &tableCompactionBuilder{
s: s,
c: c,
rec: rec,
- stat1: new(cStatsStaging),
+ stat1: new(cStatStaging),
minSeq: 0,
strict: true,
tableSize: o.CompactionTableSize/3 + 961,
@@ -2479,8 +2481,8 @@
if err := b.run(new(compactionTransactCounter)); err != nil {
t.Fatal(err)
}
- for _, t := range c.tables[0] {
- rec.delTable(c.level, t.file.Num())
+ for _, t := range c.levels[0] {
+ rec.delTable(c.sourceLevel, t.file.Num())
}
if err := s.commit(rec); err != nil {
t.Fatal(err)
@@ -2489,13 +2491,13 @@
// Build level-1.
v = s.version()
- c = newCompaction(s, v, 0, append(tFiles{}, v.tables[0]...))
+ c = newCompaction(s, v, 0, append(tFiles{}, v.levels[0]...))
rec = &sessionRecord{}
b = &tableCompactionBuilder{
s: s,
c: c,
rec: rec,
- stat1: new(cStatsStaging),
+ stat1: new(cStatStaging),
minSeq: 0,
strict: true,
tableSize: o.CompactionTableSize,
@@ -2503,11 +2505,11 @@
if err := b.run(new(compactionTransactCounter)); err != nil {
t.Fatal(err)
}
- for _, t := range c.tables[0] {
- rec.delTable(c.level, t.file.Num())
+ for _, t := range c.levels[0] {
+ rec.delTable(c.sourceLevel, t.file.Num())
}
// Move grandparent to level-3
- for _, t := range v.tables[2] {
+ for _, t := range v.levels[2] {
rec.delTable(2, t.file.Num())
rec.addTableFile(3, t)
}
@@ -2517,14 +2519,14 @@
c.release()
v = s.version()
- for level, want := range []bool{false, true, false, true, false} {
- got := len(v.tables[level]) > 0
+ for level, want := range []bool{false, true, false, true} {
+ got := len(v.levels[level]) > 0
if want != got {
t.Fatalf("invalid level-%d tables len: want %v, got %v", level, want, got)
}
}
- for i, f := range v.tables[1][:len(v.tables[1])-1] {
- nf := v.tables[1][i+1]
+ for i, f := range v.levels[1][:len(v.levels[1])-1] {
+ nf := v.levels[1][i+1]
if bytes.Equal(f.imax.ukey(), nf.imin.ukey()) {
t.Fatalf("KEY %q hop across table %d .. %d", f.imax.ukey(), f.file.Num(), nf.file.Num())
}
@@ -2533,13 +2535,13 @@
// Compaction with transient error.
v = s.version()
- c = newCompaction(s, v, 1, append(tFiles{}, v.tables[1]...))
+ c = newCompaction(s, v, 1, append(tFiles{}, v.levels[1]...))
rec = &sessionRecord{}
b = &tableCompactionBuilder{
s: s,
c: c,
rec: rec,
- stat1: new(cStatsStaging),
+ stat1: new(cStatStaging),
minSeq: 0,
strict: true,
tableSize: o.CompactionTableSize,
@@ -2563,11 +2565,11 @@
stor.SetEmuRandErr(0, tsOpRead, tsOpReadAt, tsOpWrite)
v = s.version()
- if len(v.tables[1]) != len(v.tables[2]) {
- t.Fatalf("invalid tables length, want %d, got %d", len(v.tables[1]), len(v.tables[2]))
+ if len(v.levels[1]) != len(v.levels[2]) {
+ t.Fatalf("invalid tables length, want %d, got %d", len(v.levels[1]), len(v.levels[2]))
}
- for i, f0 := range v.tables[1] {
- f1 := v.tables[2][i]
+ for i, f0 := range v.levels[1] {
+ f1 := v.levels[2][i]
iter0 := s.tops.newIterator(f0, nil, nil)
iter1 := s.tops.newIterator(f1, nil, nil)
for j := 0; true; j++ {
diff --git a/leveldb/db_util.go b/leveldb/db_util.go
index a8a2bdf..23b866d 100644
--- a/leveldb/db_util.go
+++ b/leveldb/db_util.go
@@ -41,7 +41,7 @@
defer v.release()
tablesMap := make(map[uint64]bool)
- for _, tables := range v.tables {
+ for _, tables := range v.levels {
for _, t := range tables {
tablesMap[t.file.Num()] = false
}
diff --git a/leveldb/opt/options.go b/leveldb/opt/options.go
index 25313da..190a5c0 100644
--- a/leveldb/opt/options.go
+++ b/leveldb/opt/options.go
@@ -35,7 +35,6 @@
DefaultCompactionTotalSizeMultiplier = 10.0
DefaultCompressionType = SnappyCompression
DefaultIteratorSamplingRate = 1 * MiB
- DefaultNumLevel = 7
DefaultOpenFilesCacher = LRUCacher
DefaultOpenFilesCacheCapacity = 500
DefaultWriteBuffer = 4 * MiB
@@ -305,12 +304,6 @@
// The default is false.
NoSync bool
- // NumLevel defines number of database level. The level shouldn't changed
- // between opens, or the database will panic.
- //
- // The default is 7.
- NumLevel int
-
// OpenFilesCacher provides cache algorithm for open files caching.
// Specify NoCacher to disable caching algorithm.
//
@@ -432,7 +425,7 @@
if o.CompactionTableSize > 0 {
base = o.CompactionTableSize
}
- if len(o.CompactionTableSizeMultiplierPerLevel) > level && o.CompactionTableSizeMultiplierPerLevel[level] > 0 {
+ if level < len(o.CompactionTableSizeMultiplierPerLevel) && o.CompactionTableSizeMultiplierPerLevel[level] > 0 {
mult = o.CompactionTableSizeMultiplierPerLevel[level]
} else if o.CompactionTableSizeMultiplier > 0 {
mult = math.Pow(o.CompactionTableSizeMultiplier, float64(level))
@@ -453,7 +446,7 @@
if o.CompactionTotalSize > 0 {
base = o.CompactionTotalSize
}
- if len(o.CompactionTotalSizeMultiplierPerLevel) > level && o.CompactionTotalSizeMultiplierPerLevel[level] > 0 {
+ if level < len(o.CompactionTotalSizeMultiplierPerLevel) && o.CompactionTotalSizeMultiplierPerLevel[level] > 0 {
mult = o.CompactionTotalSizeMultiplierPerLevel[level]
} else if o.CompactionTotalSizeMultiplier > 0 {
mult = math.Pow(o.CompactionTotalSizeMultiplier, float64(level))
@@ -535,13 +528,6 @@
return o.NoSync
}
-func (o *Options) GetNumLevel() int {
- if o == nil || o.NumLevel <= 0 {
- return DefaultNumLevel
- }
- return o.NumLevel
-}
-
func (o *Options) GetOpenFilesCacher() Cacher {
if o == nil || o.OpenFilesCacher == nil {
return DefaultOpenFilesCacher
diff --git a/leveldb/options.go b/leveldb/options.go
index a3d84ef..b072b1a 100644
--- a/leveldb/options.go
+++ b/leveldb/options.go
@@ -43,6 +43,8 @@
s.o.cache()
}
+const optCachedLevel = 7
+
type cachedOptions struct {
*opt.Options
@@ -54,15 +56,13 @@
}
func (co *cachedOptions) cache() {
- numLevel := co.Options.GetNumLevel()
+ co.compactionExpandLimit = make([]int, optCachedLevel)
+ co.compactionGPOverlaps = make([]int, optCachedLevel)
+ co.compactionSourceLimit = make([]int, optCachedLevel)
+ co.compactionTableSize = make([]int, optCachedLevel)
+ co.compactionTotalSize = make([]int64, optCachedLevel)
- co.compactionExpandLimit = make([]int, numLevel)
- co.compactionGPOverlaps = make([]int, numLevel)
- co.compactionSourceLimit = make([]int, numLevel)
- co.compactionTableSize = make([]int, numLevel)
- co.compactionTotalSize = make([]int64, numLevel)
-
- for level := 0; level < numLevel; level++ {
+ for level := 0; level < optCachedLevel; level++ {
co.compactionExpandLimit[level] = co.Options.GetCompactionExpandLimit(level)
co.compactionGPOverlaps[level] = co.Options.GetCompactionGPOverlaps(level)
co.compactionSourceLimit[level] = co.Options.GetCompactionSourceLimit(level)
@@ -72,21 +72,36 @@
}
func (co *cachedOptions) GetCompactionExpandLimit(level int) int {
- return co.compactionExpandLimit[level]
+ if level < optCachedLevel {
+ return co.compactionExpandLimit[level]
+ }
+ return co.Options.GetCompactionExpandLimit(level)
}
func (co *cachedOptions) GetCompactionGPOverlaps(level int) int {
- return co.compactionGPOverlaps[level]
+ if level < optCachedLevel {
+ return co.compactionGPOverlaps[level]
+ }
+ return co.Options.GetCompactionGPOverlaps(level)
}
func (co *cachedOptions) GetCompactionSourceLimit(level int) int {
- return co.compactionSourceLimit[level]
+ if level < optCachedLevel {
+ return co.compactionSourceLimit[level]
+ }
+ return co.Options.GetCompactionSourceLimit(level)
}
func (co *cachedOptions) GetCompactionTableSize(level int) int {
- return co.compactionTableSize[level]
+ if level < optCachedLevel {
+ return co.compactionTableSize[level]
+ }
+ return co.Options.GetCompactionTableSize(level)
}
func (co *cachedOptions) GetCompactionTotalSize(level int) int64 {
- return co.compactionTotalSize[level]
+ if level < optCachedLevel {
+ return co.compactionTotalSize[level]
+ }
+ return co.Options.GetCompactionTotalSize(level)
}
diff --git a/leveldb/session.go b/leveldb/session.go
index f0bba46..ade94e5 100644
--- a/leveldb/session.go
+++ b/leveldb/session.go
@@ -66,9 +66,8 @@
return
}
s = &session{
- stor: stor,
- storLock: storLock,
- stCompPtrs: make([]iKey, o.GetNumLevel()),
+ stor: stor,
+ storLock: storLock,
}
s.setOptions(o)
s.tops = newTableOps(s)
@@ -128,8 +127,7 @@
var (
// Options.
- numLevel = s.o.GetNumLevel()
- strict = s.o.GetStrict(opt.StrictManifest)
+ strict = s.o.GetStrict(opt.StrictManifest)
jr = journal.NewReader(reader, dropper{s, m}, strict, true)
rec = &sessionRecord{}
@@ -146,11 +144,11 @@
return errors.SetFile(err, m)
}
- err = rec.decode(r, numLevel)
+ err = rec.decode(r)
if err == nil {
// save compact pointers
for _, r := range rec.compPtrs {
- s.stCompPtrs[r.level] = iKey(r.ikey)
+ s.setCompPtr(r.level, iKey(r.ikey))
}
// commit record to version staging
staging.commit(rec)
diff --git a/leveldb/session_compaction.go b/leveldb/session_compaction.go
index b99e86d..417b8a1 100644
--- a/leveldb/session_compaction.go
+++ b/leveldb/session_compaction.go
@@ -20,13 +20,13 @@
return v.pickMemdbLevel(umin, umax, maxLevel)
}
-func (s *session) flushMemdb(rec *sessionRecord, mdb *memdb.DB, maxLevel int) (level int, err error) {
+func (s *session) flushMemdb(rec *sessionRecord, mdb *memdb.DB, maxLevel int) (int, error) {
// Create sorted table.
iter := mdb.NewIterator(nil)
defer iter.Release()
t, n, err := s.tops.createFrom(iter)
if err != nil {
- return
+ return 0, err
}
// Pick level other than zero can cause compaction issue with large
@@ -37,23 +37,23 @@
// higher level, thus maximum possible level is always picked, while
// overlapping deletion marker pushed into lower level.
// See: https://github.com/syndtr/goleveldb/issues/127.
- level = s.pickMemdbLevel(t.imin.ukey(), t.imax.ukey(), maxLevel)
- rec.addTableFile(level, t)
+ flushLevel := s.pickMemdbLevel(t.imin.ukey(), t.imax.ukey(), maxLevel)
+ rec.addTableFile(flushLevel, t)
- s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", level, t.file.Num(), n, shortenb(int(t.size)), t.imin, t.imax)
- return level, nil
+ s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", flushLevel, t.file.Num(), n, shortenb(int(t.size)), t.imin, t.imax)
+ return flushLevel, nil
}
// Pick a compaction based on current state; need external synchronization.
func (s *session) pickCompaction() *compaction {
v := s.version()
- var level int
+ var sourceLevel int
var t0 tFiles
if v.cScore >= 1 {
- level = v.cLevel
- cptr := s.stCompPtrs[level]
- tables := v.tables[level]
+ sourceLevel = v.cLevel
+ cptr := s.getCompPtr(sourceLevel)
+ tables := v.levels[sourceLevel]
for _, t := range tables {
if cptr == nil || s.icmp.Compare(t.imax, cptr) > 0 {
t0 = append(t0, t)
@@ -66,7 +66,7 @@
} else {
if p := atomic.LoadPointer(&v.cSeek); p != nil {
ts := (*tSet)(p)
- level = ts.level
+ sourceLevel = ts.level
t0 = append(t0, ts.table)
} else {
v.release()
@@ -74,14 +74,19 @@
}
}
- return newCompaction(s, v, level, t0)
+ return newCompaction(s, v, sourceLevel, t0)
}
// Create compaction from given level and range; need external synchronization.
-func (s *session) getCompactionRange(level int, umin, umax []byte, noLimit bool) *compaction {
+func (s *session) getCompactionRange(sourceLevel int, umin, umax []byte, noLimit bool) *compaction {
v := s.version()
- t0 := v.tables[level].getOverlaps(nil, s.icmp, umin, umax, level == 0)
+ if sourceLevel >= len(v.levels) {
+ v.release()
+ return nil
+ }
+
+ t0 := v.levels[sourceLevel].getOverlaps(nil, s.icmp, umin, umax, sourceLevel == 0)
if len(t0) == 0 {
v.release()
return nil
@@ -91,8 +96,8 @@
// But we cannot do this for level-0 since level-0 files can overlap
// and we must not pick one file and drop another older file if the
// two files overlap.
- if !noLimit && level > 0 {
- limit := uint64(v.s.o.GetCompactionSourceLimit(level))
+ if !noLimit && sourceLevel > 0 {
+ limit := uint64(v.s.o.GetCompactionSourceLimit(sourceLevel))
total := uint64(0)
for i, t := range t0 {
total += t.size
@@ -104,17 +109,17 @@
}
}
- return newCompaction(s, v, level, t0)
+ return newCompaction(s, v, sourceLevel, t0)
}
-func newCompaction(s *session, v *version, level int, t0 tFiles) *compaction {
+func newCompaction(s *session, v *version, sourceLevel int, t0 tFiles) *compaction {
c := &compaction{
s: s,
v: v,
- level: level,
- tables: [2]tFiles{t0, nil},
- maxGPOverlaps: uint64(s.o.GetCompactionGPOverlaps(level)),
- tPtrs: make([]int, s.o.GetNumLevel()),
+ sourceLevel: sourceLevel,
+ levels: [2]tFiles{t0, nil},
+ maxGPOverlaps: uint64(s.o.GetCompactionGPOverlaps(sourceLevel)),
+ tPtrs: make([]int, len(v.levels)),
}
c.expand()
c.save()
@@ -126,8 +131,8 @@
s *session
v *version
- level int
- tables [2]tFiles
+ sourceLevel int
+ levels [2]tFiles
maxGPOverlaps uint64
gp tFiles
@@ -167,30 +172,34 @@
// Expand compacted tables; need external synchronization.
func (c *compaction) expand() {
- limit := uint64(c.s.o.GetCompactionExpandLimit(c.level))
- vt0, vt1 := c.v.tables[c.level], c.v.tables[c.level+1]
+ limit := uint64(c.s.o.GetCompactionExpandLimit(c.sourceLevel))
+ vt0 := c.v.levels[c.sourceLevel]
+ vt1 := tFiles{}
+ if level := c.sourceLevel + 1; level < len(c.v.levels) {
+ vt1 = c.v.levels[level]
+ }
- t0, t1 := c.tables[0], c.tables[1]
+ t0, t1 := c.levels[0], c.levels[1]
imin, imax := t0.getRange(c.s.icmp)
// We expand t0 here just incase ukey hop across tables.
- t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.level == 0)
- if len(t0) != len(c.tables[0]) {
+ t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.sourceLevel == 0)
+ if len(t0) != len(c.levels[0]) {
imin, imax = t0.getRange(c.s.icmp)
}
t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false)
// Get entire range covered by compaction.
amin, amax := append(t0, t1...).getRange(c.s.icmp)
- // See if we can grow the number of inputs in "level" without
- // changing the number of "level+1" files we pick up.
+ // See if we can grow the number of inputs in "sourceLevel" without
+ // changing the number of "sourceLevel+1" files we pick up.
if len(t1) > 0 {
- exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), c.level == 0)
+ exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), c.sourceLevel == 0)
if len(exp0) > len(t0) && t1.size()+exp0.size() < limit {
xmin, xmax := exp0.getRange(c.s.icmp)
exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false)
if len(exp1) == len(t1) {
c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)",
- c.level, c.level+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())),
+ c.sourceLevel, c.sourceLevel+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())),
len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size())))
imin, imax = xmin, xmax
t0, t1 = exp0, exp1
@@ -200,23 +209,23 @@
}
// Compute the set of grandparent files that overlap this compaction
- // (parent == level+1; grandparent == level+2)
- if c.level+2 < c.s.o.GetNumLevel() {
- c.gp = c.v.tables[c.level+2].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false)
+ // (parent == sourceLevel+1; grandparent == sourceLevel+2)
+ if level := c.sourceLevel + 2; level < len(c.v.levels) {
+ c.gp = c.v.levels[level].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false)
}
- c.tables[0], c.tables[1] = t0, t1
+ c.levels[0], c.levels[1] = t0, t1
c.imin, c.imax = imin, imax
}
// Check whether compaction is trivial.
func (c *compaction) trivial() bool {
- return len(c.tables[0]) == 1 && len(c.tables[1]) == 0 && c.gp.size() <= c.maxGPOverlaps
+ return len(c.levels[0]) == 1 && len(c.levels[1]) == 0 && c.gp.size() <= c.maxGPOverlaps
}
func (c *compaction) baseLevelForKey(ukey []byte) bool {
- for level := c.level + 2; level < len(c.v.tables); level++ {
- tables := c.v.tables[level]
+ for level := c.sourceLevel + 2; level < len(c.v.levels); level++ {
+ tables := c.v.levels[level]
for c.tPtrs[level] < len(tables) {
t := tables[c.tPtrs[level]]
if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 {
@@ -256,10 +265,10 @@
// Creates an iterator.
func (c *compaction) newIterator() iterator.Iterator {
// Creates iterator slice.
- icap := len(c.tables)
- if c.level == 0 {
+ icap := len(c.levels)
+ if c.sourceLevel == 0 {
// Special case for level-0.
- icap = len(c.tables[0]) + 1
+ icap = len(c.levels[0]) + 1
}
its := make([]iterator.Iterator, 0, icap)
@@ -273,13 +282,13 @@
ro.Strict |= opt.StrictReader
}
- for i, tables := range c.tables {
+ for i, tables := range c.levels {
if len(tables) == 0 {
continue
}
// Level-0 is not sorted and may overlaps each other.
- if c.level+i == 0 {
+ if c.sourceLevel+i == 0 {
for _, t := range tables {
its = append(its, c.s.tops.newIterator(t, nil, ro))
}
diff --git a/leveldb/session_record.go b/leveldb/session_record.go
index 405e07b..a09b2bc 100644
--- a/leveldb/session_record.go
+++ b/leveldb/session_record.go
@@ -228,7 +228,7 @@
return x
}
-func (p *sessionRecord) readLevel(field string, r io.ByteReader, numLevel int) int {
+func (p *sessionRecord) readLevel(field string, r io.ByteReader) int {
if p.err != nil {
return 0
}
@@ -236,14 +236,10 @@
if p.err != nil {
return 0
}
- if x >= uint64(numLevel) {
- p.err = errors.NewErrCorrupted(nil, &ErrManifestCorrupted{field, "invalid level number"})
- return 0
- }
return int(x)
}
-func (p *sessionRecord) decode(r io.Reader, numLevel int) error {
+func (p *sessionRecord) decode(r io.Reader) error {
br, ok := r.(byteReader)
if !ok {
br = bufio.NewReader(r)
@@ -284,13 +280,13 @@
p.setSeqNum(x)
}
case recCompPtr:
- level := p.readLevel("comp-ptr.level", br, numLevel)
+ level := p.readLevel("comp-ptr.level", br)
ikey := p.readBytes("comp-ptr.ikey", br)
if p.err == nil {
p.addCompPtr(level, iKey(ikey))
}
case recAddTable:
- level := p.readLevel("add-table.level", br, numLevel)
+ level := p.readLevel("add-table.level", br)
num := p.readUvarint("add-table.num", br)
size := p.readUvarint("add-table.size", br)
imin := p.readBytes("add-table.imin", br)
@@ -299,7 +295,7 @@
p.addTable(level, num, size, imin, imax)
}
case recDelTable:
- level := p.readLevel("del-table.level", br, numLevel)
+ level := p.readLevel("del-table.level", br)
num := p.readUvarint("del-table.num", br)
if p.err == nil {
p.delTable(level, num)
diff --git a/leveldb/session_record_test.go b/leveldb/session_record_test.go
index 33c1487..be853a7 100644
--- a/leveldb/session_record_test.go
+++ b/leveldb/session_record_test.go
@@ -9,8 +9,6 @@
import (
"bytes"
"testing"
-
- "github.com/syndtr/goleveldb/leveldb/opt"
)
func decodeEncode(v *sessionRecord) (res bool, err error) {
@@ -20,7 +18,7 @@
return
}
v2 := &sessionRecord{}
- err = v.decode(b, opt.DefaultNumLevel)
+ err = v.decode(b)
if err != nil {
return
}
diff --git a/leveldb/session_util.go b/leveldb/session_util.go
index 7ec9f86..a7a480a 100644
--- a/leveldb/session_util.go
+++ b/leveldb/session_util.go
@@ -121,6 +121,24 @@
}
}
+// Set compaction ptr at given level; need external synchronization.
+func (s *session) setCompPtr(level int, ik iKey) {
+ if level >= len(s.stCompPtrs) {
+ newCompPtrs := make([]iKey, level+1)
+ copy(newCompPtrs, s.stCompPtrs)
+ s.stCompPtrs = newCompPtrs
+ }
+ s.stCompPtrs[level] = append(iKey{}, ik...)
+}
+
+// Get compaction ptr at given level; need external synchronization.
+func (s *session) getCompPtr(level int) iKey {
+ if level >= len(s.stCompPtrs) {
+ return nil
+ }
+ return s.stCompPtrs[level]
+}
+
// Manifest related utils.
// Fill given session record obj with current states; need external
@@ -149,21 +167,21 @@
// Mark if record has been committed, this will update session state;
// need external synchronization.
-func (s *session) recordCommited(r *sessionRecord) {
- if r.has(recJournalNum) {
- s.stJournalNum = r.journalNum
+func (s *session) recordCommited(rec *sessionRecord) {
+ if rec.has(recJournalNum) {
+ s.stJournalNum = rec.journalNum
}
- if r.has(recPrevJournalNum) {
- s.stPrevJournalNum = r.prevJournalNum
+ if rec.has(recPrevJournalNum) {
+ s.stPrevJournalNum = rec.prevJournalNum
}
- if r.has(recSeqNum) {
- s.stSeqNum = r.seqNum
+ if rec.has(recSeqNum) {
+ s.stSeqNum = rec.seqNum
}
- for _, p := range r.compPtrs {
- s.stCompPtrs[p.level] = iKey(p.ikey)
+ for _, r := range rec.compPtrs {
+ s.setCompPtr(r.level, iKey(r.ikey))
}
}
diff --git a/leveldb/version.go b/leveldb/version.go
index d6497dd..015dc72 100644
--- a/leveldb/version.go
+++ b/leveldb/version.go
@@ -7,6 +7,7 @@
package leveldb
import (
+ "fmt"
"sync/atomic"
"unsafe"
@@ -23,7 +24,7 @@
type version struct {
s *session
- tables []tFiles
+ levels []tFiles
// Level that should be compacted next and its compaction score.
// Score < 1 means compaction is not strictly needed. These fields
@@ -39,7 +40,7 @@
}
func newVersion(s *session) *version {
- return &version{s: s, tables: make([]tFiles, s.o.GetNumLevel())}
+ return &version{s: s}
}
func (v *version) releaseNB() {
@@ -51,18 +52,18 @@
panic("negative version ref")
}
- tables := make(map[uint64]bool)
- for _, tt := range v.next.tables {
+ nextTables := make(map[uint64]bool)
+ for _, tt := range v.next.levels {
for _, t := range tt {
num := t.file.Num()
- tables[num] = true
+ nextTables[num] = true
}
}
- for _, tt := range v.tables {
+ for _, tt := range v.levels {
for _, t := range tt {
num := t.file.Num()
- if _, ok := tables[num]; !ok {
+ if _, ok := nextTables[num]; !ok {
v.s.tops.remove(t)
}
}
@@ -82,7 +83,7 @@
ukey := ikey.ukey()
// Walk tables level-by-level.
- for level, tables := range v.tables {
+ for level, tables := range v.levels {
if len(tables) == 0 {
continue
}
@@ -150,6 +151,7 @@
} else {
fikey, fval, ferr = v.s.tops.find(t, ikey, ro)
}
+
switch ferr {
case nil:
case ErrNotFound:
@@ -228,27 +230,22 @@
}
func (v *version) getIterators(slice *util.Range, ro *opt.ReadOptions) (its []iterator.Iterator) {
- // Merge all level zero files together since they may overlap
- for _, t := range v.tables[0] {
- it := v.s.tops.newIterator(t, slice, ro)
- its = append(its, it)
- }
-
strict := opt.GetStrict(v.s.o.Options, ro, opt.StrictReader)
- for _, tables := range v.tables[1:] {
- if len(tables) == 0 {
- continue
+ for level, tables := range v.levels {
+ if level == 0 {
+ // Merge all level zero files together since they may overlap.
+ for _, t := range tables {
+ its = append(its, v.s.tops.newIterator(t, slice, ro))
+ }
+ } else if len(tables) != 0 {
+ its = append(its, iterator.NewIndexedIterator(tables.newIndexIterator(v.s.tops, v.s.icmp, slice, ro), strict))
}
-
- it := iterator.NewIndexedIterator(tables.newIndexIterator(v.s.tops, v.s.icmp, slice, ro), strict)
- its = append(its, it)
}
-
return
}
func (v *version) newStaging() *versionStaging {
- return &versionStaging{base: v, tables: make([]tablesScratch, v.s.o.GetNumLevel())}
+ return &versionStaging{base: v}
}
// Spawn a new version based on this version.
@@ -259,19 +256,22 @@
}
func (v *version) fillRecord(r *sessionRecord) {
- for level, ts := range v.tables {
- for _, t := range ts {
+ for level, tables := range v.levels {
+ for _, t := range tables {
r.addTableFile(level, t)
}
}
}
func (v *version) tLen(level int) int {
- return len(v.tables[level])
+ if level < len(v.levels) {
+ return len(v.levels[level])
+ }
+ return 0
}
func (v *version) offsetOf(ikey iKey) (n uint64, err error) {
- for level, tables := range v.tables {
+ for level, tables := range v.levels {
for _, t := range tables {
if v.s.icmp.Compare(t.imax, ikey) <= 0 {
// Entire file is before "ikey", so just add the file size
@@ -302,15 +302,22 @@
func (v *version) pickMemdbLevel(umin, umax []byte, maxLevel int) (level int) {
if maxLevel > 0 {
- if !v.tables[0].overlaps(v.s.icmp, umin, umax, true) {
+ if len(v.levels) == 0 {
+ return maxLevel
+ }
+ if !v.levels[0].overlaps(v.s.icmp, umin, umax, true) {
var overlaps tFiles
for ; level < maxLevel; level++ {
- if v.tables[level+1].overlaps(v.s.icmp, umin, umax, false) {
+ if pLevel := level + 1; pLevel >= len(v.levels) {
+ return maxLevel
+ } else if v.levels[pLevel].overlaps(v.s.icmp, umin, umax, false) {
break
}
- overlaps = v.tables[level+2].getOverlaps(overlaps, v.s.icmp, umin, umax, false)
- if overlaps.size() > uint64(v.s.o.GetCompactionGPOverlaps(level)) {
- break
+ if gpLevel := level + 2; gpLevel < len(v.levels) {
+ overlaps = v.levels[gpLevel].getOverlaps(overlaps, v.s.icmp, umin, umax, false)
+ if overlaps.size() > uint64(v.s.o.GetCompactionGPOverlaps(level)) {
+ break
+ }
}
}
}
@@ -320,18 +327,23 @@
func (v *version) computeCompaction() {
// Precomputed best level for next compaction
- var bestLevel int = -1
- var bestScore float64 = -1
+ bestLevel := int(-1)
+ bestScore := float64(-1)
- // Last level can't be compacted, so exclude from calculation.
- for level, tables := range v.tables[:len(v.tables)-1] {
+ statFiles := make([]int, len(v.levels))
+ statSizes := make([]string, len(v.levels))
+ statScore := make([]string, len(v.levels))
+ statTotSize := uint64(0)
+
+ for level, tables := range v.levels {
var score float64
+ size := tables.size()
if level == 0 {
// We treat level-0 specially by bounding the number of files
// instead of number of bytes for two reasons:
//
// (1) With larger write-buffer sizes, it is nice not to do too
- // many level-0 compactions.
+ // many level-0 compaction.
//
// (2) The files in level-0 are merged on every read and
// therefore we wish to avoid too many files when the individual
@@ -340,17 +352,24 @@
// overwrites/deletions).
score = float64(len(tables)) / float64(v.s.o.GetCompactionL0Trigger())
} else {
- score = float64(tables.size()) / float64(v.s.o.GetCompactionTotalSize(level))
+ score = float64(size) / float64(v.s.o.GetCompactionTotalSize(level))
}
if score > bestScore {
bestLevel = level
bestScore = score
}
+
+ statFiles[level] = len(tables)
+ statSizes[level] = shortenb(int(size))
+ statScore[level] = fmt.Sprintf("%.2f", score)
+ statTotSize += size
}
v.cLevel = bestLevel
v.cScore = bestScore
+
+ v.s.logf("version@stat F·%v S·%s%v Sc·%v", statFiles, shortenb(int(statTotSize)), statSizes, statScore)
}
func (v *version) needCompaction() bool {
@@ -364,37 +383,42 @@
type versionStaging struct {
base *version
- tables []tablesScratch
+ levels []tablesScratch
+}
+
+func (p *versionStaging) getScratch(level int) *tablesScratch {
+ if level >= len(p.levels) {
+ newLevels := make([]tablesScratch, level+1)
+ copy(newLevels, p.levels)
+ p.levels = newLevels
+ }
+ return &(p.levels[level])
}
func (p *versionStaging) commit(r *sessionRecord) {
// Deleted tables.
for _, r := range r.deletedTables {
- tm := &(p.tables[r.level])
-
- if len(p.base.tables[r.level]) > 0 {
- if tm.deleted == nil {
- tm.deleted = make(map[uint64]struct{})
+ scratch := p.getScratch(r.level)
+ if r.level < len(p.base.levels) && len(p.base.levels[r.level]) > 0 {
+ if scratch.deleted == nil {
+ scratch.deleted = make(map[uint64]struct{})
}
- tm.deleted[r.num] = struct{}{}
+ scratch.deleted[r.num] = struct{}{}
}
-
- if tm.added != nil {
- delete(tm.added, r.num)
+ if scratch.added != nil {
+ delete(scratch.added, r.num)
}
}
// New tables.
for _, r := range r.addedTables {
- tm := &(p.tables[r.level])
-
- if tm.added == nil {
- tm.added = make(map[uint64]atRecord)
+ scratch := p.getScratch(r.level)
+ if scratch.added == nil {
+ scratch.added = make(map[uint64]atRecord)
}
- tm.added[r.num] = r
-
- if tm.deleted != nil {
- delete(tm.deleted, r.num)
+ scratch.added[r.num] = r
+ if scratch.deleted != nil {
+ delete(scratch.deleted, r.num)
}
}
}
@@ -402,39 +426,62 @@
func (p *versionStaging) finish() *version {
// Build new version.
nv := newVersion(p.base.s)
- for level, tm := range p.tables {
- btables := p.base.tables[level]
-
- n := len(btables) + len(tm.added) - len(tm.deleted)
- if n < 0 {
- n = 0
- }
- nt := make(tFiles, 0, n)
-
- // Base tables.
- for _, t := range btables {
- if _, ok := tm.deleted[t.file.Num()]; ok {
- continue
- }
- if _, ok := tm.added[t.file.Num()]; ok {
- continue
- }
- nt = append(nt, t)
- }
-
- // New tables.
- for _, r := range tm.added {
- nt = append(nt, p.base.s.tableFileFromRecord(r))
- }
-
- // Sort tables.
- if level == 0 {
- nt.sortByNum()
- } else {
- nt.sortByKey(p.base.s.icmp)
- }
- nv.tables[level] = nt
+ numLevel := len(p.levels)
+ if len(p.base.levels) > numLevel {
+ numLevel = len(p.base.levels)
}
+ nv.levels = make([]tFiles, numLevel)
+ for level := 0; level < numLevel; level++ {
+ var baseTabels tFiles
+ if level < len(p.base.levels) {
+ baseTabels = p.base.levels[level]
+ }
+
+ if level < len(p.levels) {
+ scratch := p.levels[level]
+
+ var nt tFiles
+ // Prealloc list if possible.
+ if n := len(baseTabels) + len(scratch.added) - len(scratch.deleted); n > 0 {
+ nt = make(tFiles, 0, n)
+ }
+
+ // Base tables.
+ for _, t := range baseTabels {
+ if _, ok := scratch.deleted[t.file.Num()]; ok {
+ continue
+ }
+ if _, ok := scratch.added[t.file.Num()]; ok {
+ continue
+ }
+ nt = append(nt, t)
+ }
+
+ // New tables.
+ for _, r := range scratch.added {
+ nt = append(nt, p.base.s.tableFileFromRecord(r))
+ }
+
+ if len(nt) != 0 {
+ // Sort tables.
+ if level == 0 {
+ nt.sortByNum()
+ } else {
+ nt.sortByKey(p.base.s.icmp)
+ }
+
+ nv.levels[level] = nt
+ }
+ } else {
+ nv.levels[level] = baseTabels
+ }
+ }
+
+ // Trim levels.
+ n := len(nv.levels)
+ for ; n > 0 && nv.levels[n-1] == nil; n-- {
+ }
+ nv.levels = nv.levels[:n]
// Compute compaction score for new version.
nv.computeCompaction()
diff --git a/leveldb/version_test.go b/leveldb/version_test.go
new file mode 100644
index 0000000..87d54fb
--- /dev/null
+++ b/leveldb/version_test.go
@@ -0,0 +1,176 @@
+package leveldb
+
+import (
+ "encoding/binary"
+ "reflect"
+ "testing"
+)
+
+type testFileRec struct {
+ level int
+ num uint64
+}
+
+func TestVersionStaging(t *testing.T) {
+ stor := newTestStorage(t)
+ defer stor.Close()
+ s, err := newSession(stor, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ v := newVersion(s)
+ v.newStaging()
+
+ tmp := make([]byte, 4)
+ makeIKey := func(i uint64) []byte {
+ binary.BigEndian.PutUint32(tmp, uint32(i))
+ return []byte(newIkey(tmp, 0, ktVal))
+ }
+
+ for i, x := range []struct {
+ add, del []testFileRec
+ levels [][]uint64
+ }{
+ {
+ add: []testFileRec{
+ {1, 1},
+ },
+ levels: [][]uint64{
+ {},
+ {1},
+ },
+ },
+ {
+ add: []testFileRec{
+ {1, 1},
+ },
+ levels: [][]uint64{
+ {},
+ {1},
+ },
+ },
+ {
+ del: []testFileRec{
+ {1, 1},
+ },
+ levels: [][]uint64{},
+ },
+ {
+ add: []testFileRec{
+ {0, 1},
+ {0, 3},
+ {0, 2},
+ {2, 5},
+ {1, 4},
+ },
+ levels: [][]uint64{
+ {3, 2, 1},
+ {4},
+ {5},
+ },
+ },
+ {
+ add: []testFileRec{
+ {1, 6},
+ {2, 5},
+ },
+ del: []testFileRec{
+ {0, 1},
+ {0, 4},
+ },
+ levels: [][]uint64{
+ {3, 2},
+ {4, 6},
+ {5},
+ },
+ },
+ {
+ del: []testFileRec{
+ {0, 3},
+ {0, 2},
+ {1, 4},
+ {1, 6},
+ {2, 5},
+ },
+ levels: [][]uint64{},
+ },
+ {
+ add: []testFileRec{
+ {0, 1},
+ },
+ levels: [][]uint64{
+ {1},
+ },
+ },
+ {
+ add: []testFileRec{
+ {1, 2},
+ },
+ levels: [][]uint64{
+ {1},
+ {2},
+ },
+ },
+ {
+ add: []testFileRec{
+ {0, 3},
+ },
+ levels: [][]uint64{
+ {3, 1},
+ {2},
+ },
+ },
+ {
+ add: []testFileRec{
+ {6, 9},
+ },
+ levels: [][]uint64{
+ {3, 1},
+ {2},
+ {},
+ {},
+ {},
+ {},
+ {9},
+ },
+ },
+ {
+ del: []testFileRec{
+ {6, 9},
+ },
+ levels: [][]uint64{
+ {3, 1},
+ {2},
+ },
+ },
+ } {
+ rec := &sessionRecord{}
+ for _, f := range x.add {
+ ik := makeIKey(f.num)
+ rec.addTable(f.level, f.num, 1, ik, ik)
+ }
+ for _, f := range x.del {
+ rec.delTable(f.level, f.num)
+ }
+ vs := v.newStaging()
+ vs.commit(rec)
+ v = vs.finish()
+ if len(v.levels) != len(x.levels) {
+ t.Fatalf("#%d: invalid level count: want=%d got=%d", i, len(x.levels), len(v.levels))
+ }
+ for j, want := range x.levels {
+ tables := v.levels[j]
+ if len(want) != len(tables) {
+ t.Fatalf("#%d.%d: invalid tables count: want=%d got=%d", i, j, len(want), len(tables))
+ }
+ got := make([]uint64, len(tables))
+ for k, t := range tables {
+ got[k] = t.file.Num()
+ }
+ if !reflect.DeepEqual(want, got) {
+ t.Fatalf("#%d.%d: invalid tables: want=%v got=%v", i, j, want, got)
+ }
+ }
+ }
+}