| // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> |
| // All rights reserved. |
| // |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| package leveldb |
| |
| import ( |
| "bytes" |
| "container/list" |
| crand "crypto/rand" |
| "encoding/binary" |
| "fmt" |
| "math/rand" |
| "os" |
| "path/filepath" |
| "runtime" |
| "strings" |
| "sync" |
| "sync/atomic" |
| "testing" |
| "time" |
| "unsafe" |
| |
| "github.com/onsi/gomega" |
| |
| "github.com/syndtr/goleveldb/leveldb/comparer" |
| "github.com/syndtr/goleveldb/leveldb/errors" |
| "github.com/syndtr/goleveldb/leveldb/filter" |
| "github.com/syndtr/goleveldb/leveldb/iterator" |
| "github.com/syndtr/goleveldb/leveldb/opt" |
| "github.com/syndtr/goleveldb/leveldb/storage" |
| "github.com/syndtr/goleveldb/leveldb/testutil" |
| "github.com/syndtr/goleveldb/leveldb/util" |
| ) |
| |
| func tkey(i int) []byte { |
| return []byte(fmt.Sprintf("%016d", i)) |
| } |
| |
| func tval(seed, n int) []byte { |
| r := rand.New(rand.NewSource(int64(seed))) |
| return randomString(r, n) |
| } |
| |
| func testingLogger(t *testing.T) func(log string) { |
| return func(log string) { |
| t.Log(log) |
| } |
| } |
| |
| func testingPreserveOnFailed(t *testing.T) func() (preserve bool, err error) { |
| return func() (preserve bool, err error) { |
| preserve = t.Failed() |
| return |
| } |
| } |
| |
| type dbHarness struct { |
| t *testing.T |
| |
| stor *testutil.Storage |
| db *DB |
| o *opt.Options |
| ro *opt.ReadOptions |
| wo *opt.WriteOptions |
| } |
| |
| func newDbHarnessWopt(t *testing.T, o *opt.Options) *dbHarness { |
| h := new(dbHarness) |
| h.init(t, o) |
| return h |
| } |
| |
| func newDbHarness(t *testing.T) *dbHarness { |
| return newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true}) |
| } |
| |
| func (h *dbHarness) init(t *testing.T, o *opt.Options) { |
| gomega.RegisterTestingT(t) |
| h.t = t |
| h.stor = testutil.NewStorage() |
| h.stor.OnLog(testingLogger(t)) |
| h.stor.OnClose(testingPreserveOnFailed(t)) |
| h.o = o |
| h.ro = nil |
| h.wo = nil |
| |
| if err := h.openDB0(); err != nil { |
| // So that it will come after fatal message. |
| defer h.stor.Close() |
| h.t.Fatal("Open (init): got error: ", err) |
| } |
| } |
| |
| func (h *dbHarness) openDB0() (err error) { |
| h.t.Log("opening DB") |
| h.db, err = Open(h.stor, h.o) |
| return |
| } |
| |
| func (h *dbHarness) openDB() { |
| if err := h.openDB0(); err != nil { |
| h.t.Fatal("Open: got error: ", err) |
| } |
| } |
| |
| func (h *dbHarness) closeDB0() error { |
| h.t.Log("closing DB") |
| return h.db.Close() |
| } |
| |
| func (h *dbHarness) closeDB() { |
| if h.db != nil { |
| if err := h.closeDB0(); err != nil { |
| h.t.Error("Close: got error: ", err) |
| } |
| h.db = nil |
| } |
| h.stor.CloseCheck() |
| runtime.GC() |
| } |
| |
| func (h *dbHarness) reopenDB() { |
| if h.db != nil { |
| h.closeDB() |
| } |
| h.openDB() |
| } |
| |
| func (h *dbHarness) close() { |
| if h.db != nil { |
| h.closeDB0() |
| h.db = nil |
| } |
| h.stor.Close() |
| h.stor = nil |
| runtime.GC() |
| } |
| |
| func (h *dbHarness) openAssert(want bool) { |
| db, err := Open(h.stor, h.o) |
| if err != nil { |
| if want { |
| h.t.Error("Open: assert: got error: ", err) |
| } else { |
| h.t.Log("Open: assert: got error (expected): ", err) |
| } |
| } else { |
| if !want { |
| h.t.Error("Open: assert: expect error") |
| } |
| db.Close() |
| } |
| } |
| |
| func (h *dbHarness) write(batch *Batch) { |
| if err := h.db.Write(batch, h.wo); err != nil { |
| h.t.Error("Write: got error: ", err) |
| } |
| } |
| |
| func (h *dbHarness) put(key, value string) { |
| if err := h.db.Put([]byte(key), []byte(value), h.wo); err != nil { |
| h.t.Error("Put: got error: ", err) |
| } |
| } |
| |
| func (h *dbHarness) putMulti(n int, low, hi string) { |
| for i := 0; i < n; i++ { |
| h.put(low, "begin") |
| h.put(hi, "end") |
| h.compactMem() |
| } |
| } |
| |
| func (h *dbHarness) maxNextLevelOverlappingBytes(want int64) { |
| t := h.t |
| db := h.db |
| |
| var ( |
| maxOverlaps int64 |
| maxLevel int |
| ) |
| v := db.s.version() |
| if len(v.levels) > 2 { |
| for i, tt := range v.levels[1 : len(v.levels)-1] { |
| level := i + 1 |
| next := v.levels[level+1] |
| for _, t := range tt { |
| r := next.getOverlaps(nil, db.s.icmp, t.imin.ukey(), t.imax.ukey(), false) |
| sum := r.size() |
| if sum > maxOverlaps { |
| maxOverlaps = sum |
| maxLevel = level |
| } |
| } |
| } |
| } |
| v.release() |
| |
| if maxOverlaps > want { |
| t.Errorf("next level most overlapping bytes is more than %d, got=%d level=%d", want, maxOverlaps, maxLevel) |
| } else { |
| t.Logf("next level most overlapping bytes is %d, level=%d want=%d", maxOverlaps, maxLevel, want) |
| } |
| } |
| |
| func (h *dbHarness) delete(key string) { |
| t := h.t |
| db := h.db |
| |
| err := db.Delete([]byte(key), h.wo) |
| if err != nil { |
| t.Error("Delete: got error: ", err) |
| } |
| } |
| |
| func (h *dbHarness) assertNumKeys(want int) { |
| iter := h.db.NewIterator(nil, h.ro) |
| defer iter.Release() |
| got := 0 |
| for iter.Next() { |
| got++ |
| } |
| if err := iter.Error(); err != nil { |
| h.t.Error("assertNumKeys: ", err) |
| } |
| if want != got { |
| h.t.Errorf("assertNumKeys: want=%d got=%d", want, got) |
| } |
| } |
| |
| func (h *dbHarness) getr(db Reader, key string, expectFound bool) (found bool, v []byte) { |
| t := h.t |
| v, err := db.Get([]byte(key), h.ro) |
| switch err { |
| case ErrNotFound: |
| if expectFound { |
| t.Errorf("Get: key '%s' not found, want found", key) |
| } |
| case nil: |
| found = true |
| if !expectFound { |
| t.Errorf("Get: key '%s' found, want not found", key) |
| } |
| default: |
| t.Error("Get: got error: ", err) |
| } |
| return |
| } |
| |
| func (h *dbHarness) get(key string, expectFound bool) (found bool, v []byte) { |
| return h.getr(h.db, key, expectFound) |
| } |
| |
| func (h *dbHarness) getValr(db Reader, key, value string) { |
| t := h.t |
| found, r := h.getr(db, key, true) |
| if !found { |
| return |
| } |
| rval := string(r) |
| if rval != value { |
| t.Errorf("Get: invalid value, got '%s', want '%s'", rval, value) |
| } |
| } |
| |
| func (h *dbHarness) getVal(key, value string) { |
| h.getValr(h.db, key, value) |
| } |
| |
| func (h *dbHarness) allEntriesFor(key, want string) { |
| t := h.t |
| db := h.db |
| s := db.s |
| |
| ikey := makeInternalKey(nil, []byte(key), keyMaxSeq, keyTypeVal) |
| iter := db.newRawIterator(nil, nil, nil, nil) |
| if !iter.Seek(ikey) && iter.Error() != nil { |
| t.Error("AllEntries: error during seek, err: ", iter.Error()) |
| return |
| } |
| res := "[ " |
| first := true |
| for iter.Valid() { |
| if ukey, _, kt, kerr := parseInternalKey(iter.Key()); kerr == nil { |
| if s.icmp.uCompare(ikey.ukey(), ukey) != 0 { |
| break |
| } |
| if !first { |
| res += ", " |
| } |
| first = false |
| switch kt { |
| case keyTypeVal: |
| res += string(iter.Value()) |
| case keyTypeDel: |
| res += "DEL" |
| } |
| } else { |
| if !first { |
| res += ", " |
| } |
| first = false |
| res += "CORRUPTED" |
| } |
| iter.Next() |
| } |
| if !first { |
| res += " " |
| } |
| res += "]" |
| if res != want { |
| t.Errorf("AllEntries: assert failed for key %q, got=%q want=%q", key, res, want) |
| } |
| } |
| |
| // Return a string that contains all key,value pairs in order, |
| // formatted like "(k1->v1)(k2->v2)". |
| func (h *dbHarness) getKeyVal(want string) { |
| t := h.t |
| db := h.db |
| |
| s, err := db.GetSnapshot() |
| if err != nil { |
| t.Fatal("GetSnapshot: got error: ", err) |
| } |
| res := "" |
| iter := s.NewIterator(nil, nil) |
| for iter.Next() { |
| res += fmt.Sprintf("(%s->%s)", string(iter.Key()), string(iter.Value())) |
| } |
| iter.Release() |
| |
| if res != want { |
| t.Errorf("GetKeyVal: invalid key/value pair, got=%q want=%q", res, want) |
| } |
| s.Release() |
| } |
| |
| func (h *dbHarness) waitCompaction() { |
| t := h.t |
| db := h.db |
| if err := db.compTriggerWait(db.tcompCmdC); err != nil { |
| t.Error("compaction error: ", err) |
| } |
| } |
| |
| func (h *dbHarness) waitMemCompaction() { |
| t := h.t |
| db := h.db |
| |
| if err := db.compTriggerWait(db.mcompCmdC); err != nil { |
| t.Error("compaction error: ", err) |
| } |
| } |
| |
| func (h *dbHarness) compactMem() { |
| t := h.t |
| db := h.db |
| |
| t.Log("starting memdb compaction") |
| |
| db.writeLockC <- struct{}{} |
| defer func() { |
| <-db.writeLockC |
| }() |
| |
| if _, err := db.rotateMem(0, true); err != nil { |
| t.Error("compaction error: ", err) |
| } |
| |
| if h.totalTables() == 0 { |
| t.Error("zero tables after mem compaction") |
| } |
| |
| t.Log("memdb compaction done") |
| } |
| |
| func (h *dbHarness) compactRangeAtErr(level int, min, max string, wanterr bool) { |
| t := h.t |
| db := h.db |
| |
| var _min, _max []byte |
| if min != "" { |
| _min = []byte(min) |
| } |
| if max != "" { |
| _max = []byte(max) |
| } |
| |
| t.Logf("starting table range compaction: level=%d, min=%q, max=%q", level, min, max) |
| |
| if err := db.compTriggerRange(db.tcompCmdC, level, _min, _max); err != nil { |
| if wanterr { |
| t.Log("CompactRangeAt: got error (expected): ", err) |
| } else { |
| t.Error("CompactRangeAt: got error: ", err) |
| } |
| } else if wanterr { |
| t.Error("CompactRangeAt: expect error") |
| } |
| |
| t.Log("table range compaction done") |
| } |
| |
| func (h *dbHarness) compactRangeAt(level int, min, max string) { |
| h.compactRangeAtErr(level, min, max, false) |
| } |
| |
| func (h *dbHarness) compactRange(min, max string) { |
| t := h.t |
| db := h.db |
| |
| t.Logf("starting DB range compaction: min=%q, max=%q", min, max) |
| |
| var r util.Range |
| if min != "" { |
| r.Start = []byte(min) |
| } |
| if max != "" { |
| r.Limit = []byte(max) |
| } |
| if err := db.CompactRange(r); err != nil { |
| t.Error("CompactRange: got error: ", err) |
| } |
| |
| t.Log("DB range compaction done") |
| } |
| |
| func (h *dbHarness) sizeOf(start, limit string) int64 { |
| sz, err := h.db.SizeOf([]util.Range{ |
| {[]byte(start), []byte(limit)}, |
| }) |
| if err != nil { |
| h.t.Error("SizeOf: got error: ", err) |
| } |
| return sz.Sum() |
| } |
| |
| func (h *dbHarness) sizeAssert(start, limit string, low, hi int64) { |
| sz := h.sizeOf(start, limit) |
| if sz < low || sz > hi { |
| h.t.Errorf("sizeOf %q to %q not in range, want %d - %d, got %d", |
| shorten(start), shorten(limit), low, hi, sz) |
| } |
| } |
| |
| func (h *dbHarness) getSnapshot() (s *Snapshot) { |
| s, err := h.db.GetSnapshot() |
| if err != nil { |
| h.t.Fatal("GetSnapshot: got error: ", err) |
| } |
| return |
| } |
| |
| func (h *dbHarness) getTablesPerLevel() string { |
| res := "" |
| nz := 0 |
| v := h.db.s.version() |
| for level, tables := range v.levels { |
| if level > 0 { |
| res += "," |
| } |
| res += fmt.Sprint(len(tables)) |
| if len(tables) > 0 { |
| nz = len(res) |
| } |
| } |
| v.release() |
| return res[:nz] |
| } |
| |
| func (h *dbHarness) tablesPerLevel(want string) { |
| res := h.getTablesPerLevel() |
| if res != want { |
| h.t.Errorf("invalid tables len, want=%s, got=%s", want, res) |
| } |
| } |
| |
| func (h *dbHarness) totalTables() (n int) { |
| v := h.db.s.version() |
| for _, tables := range v.levels { |
| n += len(tables) |
| } |
| v.release() |
| return |
| } |
| |
| type keyValue interface { |
| Key() []byte |
| Value() []byte |
| } |
| |
| func testKeyVal(t *testing.T, kv keyValue, want string) { |
| res := string(kv.Key()) + "->" + string(kv.Value()) |
| if res != want { |
| t.Errorf("invalid key/value, want=%q, got=%q", want, res) |
| } |
| } |
| |
| func numKey(num int) string { |
| return fmt.Sprintf("key%06d", num) |
| } |
| |
| var testingBloomFilter = filter.NewBloomFilter(10) |
| |
| func truno(t *testing.T, o *opt.Options, f func(h *dbHarness)) { |
| for i := 0; i < 4; i++ { |
| func() { |
| switch i { |
| case 0: |
| case 1: |
| if o == nil { |
| o = &opt.Options{ |
| DisableLargeBatchTransaction: true, |
| Filter: testingBloomFilter, |
| } |
| } else { |
| old := o |
| o = &opt.Options{} |
| *o = *old |
| o.Filter = testingBloomFilter |
| } |
| case 2: |
| if o == nil { |
| o = &opt.Options{ |
| DisableLargeBatchTransaction: true, |
| Compression: opt.NoCompression, |
| } |
| } else { |
| old := o |
| o = &opt.Options{} |
| *o = *old |
| o.Compression = opt.NoCompression |
| } |
| } |
| h := newDbHarnessWopt(t, o) |
| defer h.close() |
| switch i { |
| case 3: |
| h.reopenDB() |
| } |
| f(h) |
| }() |
| } |
| } |
| |
| func trun(t *testing.T, f func(h *dbHarness)) { |
| truno(t, nil, f) |
| } |
| |
| func testAligned(t *testing.T, name string, offset uintptr) { |
| if offset%8 != 0 { |
| t.Errorf("field %s offset is not 64-bit aligned", name) |
| } |
| } |
| |
| func Test_FieldsAligned(t *testing.T) { |
| p1 := new(DB) |
| testAligned(t, "DB.seq", unsafe.Offsetof(p1.seq)) |
| p2 := new(session) |
| testAligned(t, "session.stNextFileNum", unsafe.Offsetof(p2.stNextFileNum)) |
| testAligned(t, "session.stJournalNum", unsafe.Offsetof(p2.stJournalNum)) |
| testAligned(t, "session.stPrevJournalNum", unsafe.Offsetof(p2.stPrevJournalNum)) |
| testAligned(t, "session.stSeqNum", unsafe.Offsetof(p2.stSeqNum)) |
| } |
| |
| func TestDB_Locking(t *testing.T) { |
| h := newDbHarness(t) |
| defer h.stor.Close() |
| h.openAssert(false) |
| h.closeDB() |
| h.openAssert(true) |
| } |
| |
| func TestDB_Empty(t *testing.T) { |
| trun(t, func(h *dbHarness) { |
| h.get("foo", false) |
| |
| h.reopenDB() |
| h.get("foo", false) |
| }) |
| } |
| |
| func TestDB_ReadWrite(t *testing.T) { |
| trun(t, func(h *dbHarness) { |
| h.put("foo", "v1") |
| h.getVal("foo", "v1") |
| h.put("bar", "v2") |
| h.put("foo", "v3") |
| h.getVal("foo", "v3") |
| h.getVal("bar", "v2") |
| |
| h.reopenDB() |
| h.getVal("foo", "v3") |
| h.getVal("bar", "v2") |
| }) |
| } |
| |
| func TestDB_PutDeleteGet(t *testing.T) { |
| trun(t, func(h *dbHarness) { |
| h.put("foo", "v1") |
| h.getVal("foo", "v1") |
| h.put("foo", "v2") |
| h.getVal("foo", "v2") |
| h.delete("foo") |
| h.get("foo", false) |
| |
| h.reopenDB() |
| h.get("foo", false) |
| }) |
| } |
| |
| func TestDB_EmptyBatch(t *testing.T) { |
| h := newDbHarness(t) |
| defer h.close() |
| |
| h.get("foo", false) |
| err := h.db.Write(new(Batch), h.wo) |
| if err != nil { |
| t.Error("writing empty batch yield error: ", err) |
| } |
| h.get("foo", false) |
| } |
| |
| func TestDB_GetFromFrozen(t *testing.T) { |
| h := newDbHarnessWopt(t, &opt.Options{ |
| DisableLargeBatchTransaction: true, |
| WriteBuffer: 100100, |
| }) |
| defer h.close() |
| |
| h.put("foo", "v1") |
| h.getVal("foo", "v1") |
| |
| h.stor.Stall(testutil.ModeSync, storage.TypeTable) // Block sync calls |
| h.put("k1", strings.Repeat("x", 100000)) // Fill memtable |
| h.put("k2", strings.Repeat("y", 100000)) // Trigger compaction |
| for i := 0; h.db.getFrozenMem() == nil && i < 100; i++ { |
| time.Sleep(10 * time.Microsecond) |
| } |
| if h.db.getFrozenMem() == nil { |
| h.stor.Release(testutil.ModeSync, storage.TypeTable) |
| t.Fatal("No frozen mem") |
| } |
| h.getVal("foo", "v1") |
| h.stor.Release(testutil.ModeSync, storage.TypeTable) // Release sync calls |
| |
| h.reopenDB() |
| h.getVal("foo", "v1") |
| h.get("k1", true) |
| h.get("k2", true) |
| } |
| |
| func TestDB_GetFromTable(t *testing.T) { |
| trun(t, func(h *dbHarness) { |
| h.put("foo", "v1") |
| h.compactMem() |
| h.getVal("foo", "v1") |
| }) |
| } |
| |
| func TestDB_GetSnapshot(t *testing.T) { |
| trun(t, func(h *dbHarness) { |
| bar := strings.Repeat("b", 200) |
| h.put("foo", "v1") |
| h.put(bar, "v1") |
| |
| snap, err := h.db.GetSnapshot() |
| if err != nil { |
| t.Fatal("GetSnapshot: got error: ", err) |
| } |
| |
| h.put("foo", "v2") |
| h.put(bar, "v2") |
| |
| h.getVal("foo", "v2") |
| h.getVal(bar, "v2") |
| h.getValr(snap, "foo", "v1") |
| h.getValr(snap, bar, "v1") |
| |
| h.compactMem() |
| |
| h.getVal("foo", "v2") |
| h.getVal(bar, "v2") |
| h.getValr(snap, "foo", "v1") |
| h.getValr(snap, bar, "v1") |
| |
| snap.Release() |
| |
| h.reopenDB() |
| h.getVal("foo", "v2") |
| h.getVal(bar, "v2") |
| }) |
| } |
| |
| func TestDB_GetLevel0Ordering(t *testing.T) { |
| trun(t, func(h *dbHarness) { |
| h.db.memdbMaxLevel = 2 |
| |
| for i := 0; i < 4; i++ { |
| h.put("bar", fmt.Sprintf("b%d", i)) |
| h.put("foo", fmt.Sprintf("v%d", i)) |
| h.compactMem() |
| } |
| h.getVal("foo", "v3") |
| h.getVal("bar", "b3") |
| |
| v := h.db.s.version() |
| t0len := v.tLen(0) |
| v.release() |
| if t0len < 2 { |
| t.Errorf("level-0 tables is less than 2, got %d", t0len) |
| } |
| |
| h.reopenDB() |
| h.getVal("foo", "v3") |
| h.getVal("bar", "b3") |
| }) |
| } |
| |
| func TestDB_GetOrderedByLevels(t *testing.T) { |
| trun(t, func(h *dbHarness) { |
| h.put("foo", "v1") |
| h.compactMem() |
| h.compactRange("a", "z") |
| h.getVal("foo", "v1") |
| h.put("foo", "v2") |
| h.compactMem() |
| h.getVal("foo", "v2") |
| }) |
| } |
| |
| func TestDB_GetPicksCorrectFile(t *testing.T) { |
| trun(t, func(h *dbHarness) { |
| // Arrange to have multiple files in a non-level-0 level. |
| h.put("a", "va") |
| h.compactMem() |
| h.compactRange("a", "b") |
| h.put("x", "vx") |
| h.compactMem() |
| h.compactRange("x", "y") |
| h.put("f", "vf") |
| h.compactMem() |
| h.compactRange("f", "g") |
| |
| h.getVal("a", "va") |
| h.getVal("f", "vf") |
| h.getVal("x", "vx") |
| |
| h.compactRange("", "") |
| h.getVal("a", "va") |
| h.getVal("f", "vf") |
| h.getVal("x", "vx") |
| }) |
| } |
| |
| func TestDB_GetEncountersEmptyLevel(t *testing.T) { |
| trun(t, func(h *dbHarness) { |
| h.db.memdbMaxLevel = 2 |
| |
| // Arrange for the following to happen: |
| // * sstable A in level 0 |
| // * nothing in level 1 |
| // * sstable B in level 2 |
| // Then do enough Get() calls to arrange for an automatic compaction |
| // of sstable A. A bug would cause the compaction to be marked as |
| // occuring at level 1 (instead of the correct level 0). |
| |
| // Step 1: First place sstables in levels 0 and 2 |
| for i := 0; ; i++ { |
| if i >= 100 { |
| t.Fatal("could not fill levels-0 and level-2") |
| } |
| v := h.db.s.version() |
| if v.tLen(0) > 0 && v.tLen(2) > 0 { |
| v.release() |
| break |
| } |
| v.release() |
| h.put("a", "begin") |
| h.put("z", "end") |
| h.compactMem() |
| |
| h.getVal("a", "begin") |
| h.getVal("z", "end") |
| } |
| |
| // Step 2: clear level 1 if necessary. |
| h.compactRangeAt(1, "", "") |
| h.tablesPerLevel("1,0,1") |
| |
| h.getVal("a", "begin") |
| h.getVal("z", "end") |
| |
| // Step 3: read a bunch of times |
| for i := 0; i < 200; i++ { |
| h.get("missing", false) |
| } |
| |
| // Step 4: Wait for compaction to finish |
| h.waitCompaction() |
| |
| v := h.db.s.version() |
| if v.tLen(0) > 0 { |
| t.Errorf("level-0 tables more than 0, got %d", v.tLen(0)) |
| } |
| v.release() |
| |
| h.getVal("a", "begin") |
| h.getVal("z", "end") |
| }) |
| } |
| |
| func TestDB_IterMultiWithDelete(t *testing.T) { |
| trun(t, func(h *dbHarness) { |
| h.put("a", "va") |
| h.put("b", "vb") |
| h.put("c", "vc") |
| h.delete("b") |
| h.get("b", false) |
| |
| iter := h.db.NewIterator(nil, nil) |
| iter.Seek([]byte("c")) |
| testKeyVal(t, iter, "c->vc") |
| iter.Prev() |
| testKeyVal(t, iter, "a->va") |
| iter.Release() |
| |
| h.compactMem() |
| |
| iter = h.db.NewIterator(nil, nil) |
| iter.Seek([]byte("c")) |
| testKeyVal(t, iter, "c->vc") |
| iter.Prev() |
| testKeyVal(t, iter, "a->va") |
| iter.Release() |
| }) |
| } |
| |
| func TestDB_IteratorPinsRef(t *testing.T) { |
| h := newDbHarness(t) |
| defer h.close() |
| |
| h.put("foo", "hello") |
| |
| // Get iterator that will yield the current contents of the DB. |
| iter := h.db.NewIterator(nil, nil) |
| |
| // Write to force compactions |
| h.put("foo", "newvalue1") |
| for i := 0; i < 100; i++ { |
| h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), 100000/10)) |
| } |
| h.put("foo", "newvalue2") |
| |
| iter.First() |
| testKeyVal(t, iter, "foo->hello") |
| if iter.Next() { |
| t.Errorf("expect eof") |
| } |
| iter.Release() |
| } |
| |
| func TestDB_Recover(t *testing.T) { |
| trun(t, func(h *dbHarness) { |
| h.put("foo", "v1") |
| h.put("baz", "v5") |
| |
| h.reopenDB() |
| h.getVal("foo", "v1") |
| |
| h.getVal("foo", "v1") |
| h.getVal("baz", "v5") |
| h.put("bar", "v2") |
| h.put("foo", "v3") |
| |
| h.reopenDB() |
| h.getVal("foo", "v3") |
| h.put("foo", "v4") |
| h.getVal("foo", "v4") |
| h.getVal("bar", "v2") |
| h.getVal("baz", "v5") |
| }) |
| } |
| |
| func TestDB_RecoverWithEmptyJournal(t *testing.T) { |
| trun(t, func(h *dbHarness) { |
| h.put("foo", "v1") |
| h.put("foo", "v2") |
| |
| h.reopenDB() |
| h.reopenDB() |
| h.put("foo", "v3") |
| |
| h.reopenDB() |
| h.getVal("foo", "v3") |
| }) |
| } |
| |
| func TestDB_RecoverDuringMemtableCompaction(t *testing.T) { |
| truno(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 1000000}, func(h *dbHarness) { |
| |
| h.stor.Stall(testutil.ModeSync, storage.TypeTable) |
| h.put("big1", strings.Repeat("x", 10000000)) |
| h.put("big2", strings.Repeat("y", 1000)) |
| h.put("bar", "v2") |
| h.stor.Release(testutil.ModeSync, storage.TypeTable) |
| |
| h.reopenDB() |
| h.getVal("bar", "v2") |
| h.getVal("big1", strings.Repeat("x", 10000000)) |
| h.getVal("big2", strings.Repeat("y", 1000)) |
| }) |
| } |
| |
| func TestDB_MinorCompactionsHappen(t *testing.T) { |
| h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 10000}) |
| defer h.close() |
| |
| n := 500 |
| |
| key := func(i int) string { |
| return fmt.Sprintf("key%06d", i) |
| } |
| |
| for i := 0; i < n; i++ { |
| h.put(key(i), key(i)+strings.Repeat("v", 1000)) |
| } |
| |
| for i := 0; i < n; i++ { |
| h.getVal(key(i), key(i)+strings.Repeat("v", 1000)) |
| } |
| |
| h.reopenDB() |
| for i := 0; i < n; i++ { |
| h.getVal(key(i), key(i)+strings.Repeat("v", 1000)) |
| } |
| } |
| |
| func TestDB_RecoverWithLargeJournal(t *testing.T) { |
| h := newDbHarness(t) |
| defer h.close() |
| |
| h.put("big1", strings.Repeat("1", 200000)) |
| h.put("big2", strings.Repeat("2", 200000)) |
| h.put("small3", strings.Repeat("3", 10)) |
| h.put("small4", strings.Repeat("4", 10)) |
| h.tablesPerLevel("") |
| |
| // Make sure that if we re-open with a small write buffer size that |
| // we flush table files in the middle of a large journal file. |
| h.o.WriteBuffer = 100000 |
| h.reopenDB() |
| h.getVal("big1", strings.Repeat("1", 200000)) |
| h.getVal("big2", strings.Repeat("2", 200000)) |
| h.getVal("small3", strings.Repeat("3", 10)) |
| h.getVal("small4", strings.Repeat("4", 10)) |
| v := h.db.s.version() |
| if v.tLen(0) <= 1 { |
| t.Errorf("tables-0 less than one") |
| } |
| v.release() |
| } |
| |
| func TestDB_CompactionsGenerateMultipleFiles(t *testing.T) { |
| h := newDbHarnessWopt(t, &opt.Options{ |
| DisableLargeBatchTransaction: true, |
| WriteBuffer: 10000000, |
| Compression: opt.NoCompression, |
| }) |
| defer h.close() |
| |
| v := h.db.s.version() |
| if v.tLen(0) > 0 { |
| t.Errorf("level-0 tables more than 0, got %d", v.tLen(0)) |
| } |
| v.release() |
| |
| n := 80 |
| |
| // Write 8MB (80 values, each 100K) |
| for i := 0; i < n; i++ { |
| h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), 100000/10)) |
| } |
| |
| // Reopening moves updates to level-0 |
| h.reopenDB() |
| h.compactRangeAt(0, "", "") |
| |
| v = h.db.s.version() |
| if v.tLen(0) > 0 { |
| t.Errorf("level-0 tables more than 0, got %d", v.tLen(0)) |
| } |
| if v.tLen(1) <= 1 { |
| t.Errorf("level-1 tables less than 1, got %d", v.tLen(1)) |
| } |
| v.release() |
| |
| for i := 0; i < n; i++ { |
| h.getVal(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), 100000/10)) |
| } |
| } |
| |
| func TestDB_RepeatedWritesToSameKey(t *testing.T) { |
| h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 100000}) |
| defer h.close() |
| |
| maxTables := h.o.GetWriteL0PauseTrigger() + 7 |
| |
| value := strings.Repeat("v", 2*h.o.GetWriteBuffer()) |
| for i := 0; i < 5*maxTables; i++ { |
| h.put("key", value) |
| n := h.totalTables() |
| if n > maxTables { |
| t.Errorf("total tables exceed %d, got=%d, iter=%d", maxTables, n, i) |
| } |
| } |
| } |
| |
| func TestDB_RepeatedWritesToSameKeyAfterReopen(t *testing.T) { |
| h := newDbHarnessWopt(t, &opt.Options{ |
| DisableLargeBatchTransaction: true, |
| WriteBuffer: 100000, |
| }) |
| defer h.close() |
| |
| h.reopenDB() |
| |
| maxTables := h.o.GetWriteL0PauseTrigger() + 7 |
| |
| value := strings.Repeat("v", 2*h.o.GetWriteBuffer()) |
| for i := 0; i < 5*maxTables; i++ { |
| h.put("key", value) |
| n := h.totalTables() |
| if n > maxTables { |
| t.Errorf("total tables exceed %d, got=%d, iter=%d", maxTables, n, i) |
| } |
| } |
| } |
| |
| func TestDB_SparseMerge(t *testing.T) { |
| h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, Compression: opt.NoCompression}) |
| defer h.close() |
| |
| h.putMulti(7, "A", "Z") |
| |
| // Suppose there is: |
| // small amount of data with prefix A |
| // large amount of data with prefix B |
| // small amount of data with prefix C |
| // and that recent updates have made small changes to all three prefixes. |
| // Check that we do not do a compaction that merges all of B in one shot. |
| h.put("A", "va") |
| value := strings.Repeat("x", 1000) |
| for i := 0; i < 100000; i++ { |
| h.put(fmt.Sprintf("B%010d", i), value) |
| } |
| h.put("C", "vc") |
| h.compactMem() |
| h.compactRangeAt(0, "", "") |
| h.waitCompaction() |
| |
| // Make sparse update |
| h.put("A", "va2") |
| h.put("B100", "bvalue2") |
| h.put("C", "vc2") |
| h.compactMem() |
| |
| h.waitCompaction() |
| h.maxNextLevelOverlappingBytes(20 * 1048576) |
| h.compactRangeAt(0, "", "") |
| h.waitCompaction() |
| h.maxNextLevelOverlappingBytes(20 * 1048576) |
| h.compactRangeAt(1, "", "") |
| h.waitCompaction() |
| h.maxNextLevelOverlappingBytes(20 * 1048576) |
| } |
| |
| func TestDB_SizeOf(t *testing.T) { |
| h := newDbHarnessWopt(t, &opt.Options{ |
| DisableLargeBatchTransaction: true, |
| Compression: opt.NoCompression, |
| WriteBuffer: 10000000, |
| }) |
| defer h.close() |
| |
| h.sizeAssert("", "xyz", 0, 0) |
| h.reopenDB() |
| h.sizeAssert("", "xyz", 0, 0) |
| |
| // Write 8MB (80 values, each 100K) |
| n := 80 |
| s1 := 100000 |
| s2 := 105000 |
| |
| for i := 0; i < n; i++ { |
| h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), s1/10)) |
| } |
| |
| // 0 because SizeOf() does not account for memtable space |
| h.sizeAssert("", numKey(50), 0, 0) |
| |
| for r := 0; r < 3; r++ { |
| h.reopenDB() |
| |
| for cs := 0; cs < n; cs += 10 { |
| for i := 0; i < n; i += 10 { |
| h.sizeAssert("", numKey(i), int64(s1*i), int64(s2*i)) |
| h.sizeAssert("", numKey(i)+".suffix", int64(s1*(i+1)), int64(s2*(i+1))) |
| h.sizeAssert(numKey(i), numKey(i+10), int64(s1*10), int64(s2*10)) |
| } |
| |
| h.sizeAssert("", numKey(50), int64(s1*50), int64(s2*50)) |
| h.sizeAssert("", numKey(50)+".suffix", int64(s1*50), int64(s2*50)) |
| |
| h.compactRangeAt(0, numKey(cs), numKey(cs+9)) |
| } |
| |
| v := h.db.s.version() |
| if v.tLen(0) != 0 { |
| t.Errorf("level-0 tables was not zero, got %d", v.tLen(0)) |
| } |
| if v.tLen(1) == 0 { |
| t.Error("level-1 tables was zero") |
| } |
| v.release() |
| } |
| } |
| |
| func TestDB_SizeOf_MixOfSmallAndLarge(t *testing.T) { |
| h := newDbHarnessWopt(t, &opt.Options{ |
| DisableLargeBatchTransaction: true, |
| Compression: opt.NoCompression, |
| }) |
| defer h.close() |
| |
| sizes := []int64{ |
| 10000, |
| 10000, |
| 100000, |
| 10000, |
| 100000, |
| 10000, |
| 300000, |
| 10000, |
| } |
| |
| for i, n := range sizes { |
| h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), int(n)/10)) |
| } |
| |
| for r := 0; r < 3; r++ { |
| h.reopenDB() |
| |
| var x int64 |
| for i, n := range sizes { |
| y := x |
| if i > 0 { |
| y += 1000 |
| } |
| h.sizeAssert("", numKey(i), x, y) |
| x += n |
| } |
| |
| h.sizeAssert(numKey(3), numKey(5), 110000, 111000) |
| |
| h.compactRangeAt(0, "", "") |
| } |
| } |
| |
| func TestDB_Snapshot(t *testing.T) { |
| trun(t, func(h *dbHarness) { |
| h.put("foo", "v1") |
| s1 := h.getSnapshot() |
| h.put("foo", "v2") |
| s2 := h.getSnapshot() |
| h.put("foo", "v3") |
| s3 := h.getSnapshot() |
| h.put("foo", "v4") |
| |
| h.getValr(s1, "foo", "v1") |
| h.getValr(s2, "foo", "v2") |
| h.getValr(s3, "foo", "v3") |
| h.getVal("foo", "v4") |
| |
| s3.Release() |
| h.getValr(s1, "foo", "v1") |
| h.getValr(s2, "foo", "v2") |
| h.getVal("foo", "v4") |
| |
| s1.Release() |
| h.getValr(s2, "foo", "v2") |
| h.getVal("foo", "v4") |
| |
| s2.Release() |
| h.getVal("foo", "v4") |
| }) |
| } |
| |
| func TestDB_SnapshotList(t *testing.T) { |
| db := &DB{snapsList: list.New()} |
| e0a := db.acquireSnapshot() |
| e0b := db.acquireSnapshot() |
| db.seq = 1 |
| e1 := db.acquireSnapshot() |
| db.seq = 2 |
| e2 := db.acquireSnapshot() |
| |
| if db.minSeq() != 0 { |
| t.Fatalf("invalid sequence number, got=%d", db.minSeq()) |
| } |
| db.releaseSnapshot(e0a) |
| if db.minSeq() != 0 { |
| t.Fatalf("invalid sequence number, got=%d", db.minSeq()) |
| } |
| db.releaseSnapshot(e2) |
| if db.minSeq() != 0 { |
| t.Fatalf("invalid sequence number, got=%d", db.minSeq()) |
| } |
| db.releaseSnapshot(e0b) |
| if db.minSeq() != 1 { |
| t.Fatalf("invalid sequence number, got=%d", db.minSeq()) |
| } |
| e2 = db.acquireSnapshot() |
| if db.minSeq() != 1 { |
| t.Fatalf("invalid sequence number, got=%d", db.minSeq()) |
| } |
| db.releaseSnapshot(e1) |
| if db.minSeq() != 2 { |
| t.Fatalf("invalid sequence number, got=%d", db.minSeq()) |
| } |
| db.releaseSnapshot(e2) |
| if db.minSeq() != 2 { |
| t.Fatalf("invalid sequence number, got=%d", db.minSeq()) |
| } |
| } |
| |
| func TestDB_HiddenValuesAreRemoved(t *testing.T) { |
| trun(t, func(h *dbHarness) { |
| s := h.db.s |
| |
| m := 2 |
| h.db.memdbMaxLevel = m |
| |
| h.put("foo", "v1") |
| h.compactMem() |
| v := s.version() |
| num := v.tLen(m) |
| v.release() |
| if num != 1 { |
| t.Errorf("invalid level-%d len, want=1 got=%d", m, num) |
| } |
| |
| // Place a table at level last-1 to prevent merging with preceding mutation |
| h.put("a", "begin") |
| h.put("z", "end") |
| h.compactMem() |
| v = s.version() |
| if v.tLen(m) != 1 { |
| t.Errorf("invalid level-%d len, want=1 got=%d", m, v.tLen(m)) |
| } |
| if v.tLen(m-1) != 1 { |
| t.Errorf("invalid level-%d len, want=1 got=%d", m-1, v.tLen(m-1)) |
| } |
| v.release() |
| |
| h.delete("foo") |
| h.put("foo", "v2") |
| h.allEntriesFor("foo", "[ v2, DEL, v1 ]") |
| h.compactMem() |
| h.allEntriesFor("foo", "[ v2, DEL, v1 ]") |
| h.compactRangeAt(m-2, "", "z") |
| // DEL eliminated, but v1 remains because we aren't compacting that level |
| // (DEL can be eliminated because v2 hides v1). |
| h.allEntriesFor("foo", "[ v2, v1 ]") |
| h.compactRangeAt(m-1, "", "") |
| // Merging last-1 w/ last, so we are the base level for "foo", so |
| // DEL is removed. (as is v1). |
| h.allEntriesFor("foo", "[ v2 ]") |
| }) |
| } |
| |
| func TestDB_DeletionMarkers2(t *testing.T) { |
| h := newDbHarness(t) |
| defer h.close() |
| s := h.db.s |
| |
| m := 2 |
| h.db.memdbMaxLevel = m |
| |
| h.put("foo", "v1") |
| h.compactMem() |
| v := s.version() |
| num := v.tLen(m) |
| v.release() |
| if num != 1 { |
| t.Errorf("invalid level-%d len, want=1 got=%d", m, num) |
| } |
| |
| // Place a table at level last-1 to prevent merging with preceding mutation |
| h.put("a", "begin") |
| h.put("z", "end") |
| h.compactMem() |
| v = s.version() |
| if v.tLen(m) != 1 { |
| t.Errorf("invalid level-%d len, want=1 got=%d", m, v.tLen(m)) |
| } |
| if v.tLen(m-1) != 1 { |
| t.Errorf("invalid level-%d len, want=1 got=%d", m-1, v.tLen(m-1)) |
| } |
| v.release() |
| |
| h.delete("foo") |
| h.allEntriesFor("foo", "[ DEL, v1 ]") |
| h.compactMem() // Moves to level last-2 |
| h.allEntriesFor("foo", "[ DEL, v1 ]") |
| h.compactRangeAt(m-2, "", "") |
| // DEL kept: "last" file overlaps |
| h.allEntriesFor("foo", "[ DEL, v1 ]") |
| h.compactRangeAt(m-1, "", "") |
| // Merging last-1 w/ last, so we are the base level for "foo", so |
| // DEL is removed. (as is v1). |
| h.allEntriesFor("foo", "[ ]") |
| } |
| |
| func TestDB_CompactionTableOpenError(t *testing.T) { |
| h := newDbHarnessWopt(t, &opt.Options{ |
| DisableLargeBatchTransaction: true, |
| OpenFilesCacheCapacity: -1, |
| }) |
| defer h.close() |
| |
| h.db.memdbMaxLevel = 2 |
| |
| im := 10 |
| jm := 10 |
| for r := 0; r < 2; r++ { |
| for i := 0; i < im; i++ { |
| for j := 0; j < jm; j++ { |
| h.put(fmt.Sprintf("k%d,%d", i, j), fmt.Sprintf("v%d,%d", i, j)) |
| } |
| h.compactMem() |
| } |
| } |
| |
| if n := h.totalTables(); n != im*2 { |
| t.Errorf("total tables is %d, want %d", n, im*2) |
| } |
| |
| h.stor.EmulateError(testutil.ModeOpen, storage.TypeTable, errors.New("open error during table compaction")) |
| go h.db.CompactRange(util.Range{}) |
| if err := h.db.compTriggerWait(h.db.tcompCmdC); err != nil { |
| t.Log("compaction error: ", err) |
| } |
| h.closeDB0() |
| h.openDB() |
| h.stor.EmulateError(testutil.ModeOpen, storage.TypeTable, nil) |
| |
| for i := 0; i < im; i++ { |
| for j := 0; j < jm; j++ { |
| h.getVal(fmt.Sprintf("k%d,%d", i, j), fmt.Sprintf("v%d,%d", i, j)) |
| } |
| } |
| } |
| |
| func TestDB_OverlapInLevel0(t *testing.T) { |
| trun(t, func(h *dbHarness) { |
| h.db.memdbMaxLevel = 2 |
| |
| // Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0. |
| h.put("100", "v100") |
| h.put("999", "v999") |
| h.compactMem() |
| h.delete("100") |
| h.delete("999") |
| h.compactMem() |
| h.tablesPerLevel("0,1,1") |
| |
| // Make files spanning the following ranges in level-0: |
| // files[0] 200 .. 900 |
| // files[1] 300 .. 500 |
| // Note that files are sorted by min key. |
| h.put("300", "v300") |
| h.put("500", "v500") |
| h.compactMem() |
| h.put("200", "v200") |
| h.put("600", "v600") |
| h.put("900", "v900") |
| h.compactMem() |
| h.tablesPerLevel("2,1,1") |
| |
| // Compact away the placeholder files we created initially |
| h.compactRangeAt(1, "", "") |
| h.compactRangeAt(2, "", "") |
| h.tablesPerLevel("2") |
| |
| // Do a memtable compaction. Before bug-fix, the compaction would |
| // not detect the overlap with level-0 files and would incorrectly place |
| // the deletion in a deeper level. |
| h.delete("600") |
| h.compactMem() |
| h.tablesPerLevel("3") |
| h.get("600", false) |
| }) |
| } |
| |
| func TestDB_L0_CompactionBug_Issue44_a(t *testing.T) { |
| h := newDbHarness(t) |
| defer h.close() |
| |
| h.reopenDB() |
| h.put("b", "v") |
| h.reopenDB() |
| h.delete("b") |
| h.delete("a") |
| h.reopenDB() |
| h.delete("a") |
| h.reopenDB() |
| h.put("a", "v") |
| h.reopenDB() |
| h.reopenDB() |
| h.getKeyVal("(a->v)") |
| h.waitCompaction() |
| h.getKeyVal("(a->v)") |
| } |
| |
| func TestDB_L0_CompactionBug_Issue44_b(t *testing.T) { |
| h := newDbHarness(t) |
| defer h.close() |
| |
| h.reopenDB() |
| h.put("", "") |
| h.reopenDB() |
| h.delete("e") |
| h.put("", "") |
| h.reopenDB() |
| h.put("c", "cv") |
| h.reopenDB() |
| h.put("", "") |
| h.reopenDB() |
| h.put("", "") |
| h.waitCompaction() |
| h.reopenDB() |
| h.put("d", "dv") |
| h.reopenDB() |
| h.put("", "") |
| h.reopenDB() |
| h.delete("d") |
| h.delete("b") |
| h.reopenDB() |
| h.getKeyVal("(->)(c->cv)") |
| h.waitCompaction() |
| h.getKeyVal("(->)(c->cv)") |
| } |
| |
| func TestDB_SingleEntryMemCompaction(t *testing.T) { |
| trun(t, func(h *dbHarness) { |
| for i := 0; i < 10; i++ { |
| h.put("big", strings.Repeat("v", opt.DefaultWriteBuffer)) |
| h.compactMem() |
| h.put("key", strings.Repeat("v", opt.DefaultBlockSize)) |
| h.compactMem() |
| h.put("k", "v") |
| h.compactMem() |
| h.put("", "") |
| h.compactMem() |
| h.put("verybig", strings.Repeat("v", opt.DefaultWriteBuffer*2)) |
| h.compactMem() |
| } |
| }) |
| } |
| |
| func TestDB_ManifestWriteError(t *testing.T) { |
| for i := 0; i < 2; i++ { |
| func() { |
| h := newDbHarness(t) |
| defer h.close() |
| |
| h.put("foo", "bar") |
| h.getVal("foo", "bar") |
| |
| // Mem compaction (will succeed) |
| h.compactMem() |
| h.getVal("foo", "bar") |
| v := h.db.s.version() |
| if n := v.tLen(0); n != 1 { |
| t.Errorf("invalid total tables, want=1 got=%d", n) |
| } |
| v.release() |
| |
| if i == 0 { |
| h.stor.EmulateError(testutil.ModeWrite, storage.TypeManifest, errors.New("manifest write error")) |
| } else { |
| h.stor.EmulateError(testutil.ModeSync, storage.TypeManifest, errors.New("manifest sync error")) |
| } |
| |
| // Merging compaction (will fail) |
| h.compactRangeAtErr(0, "", "", true) |
| |
| h.db.Close() |
| h.stor.EmulateError(testutil.ModeWrite, storage.TypeManifest, nil) |
| h.stor.EmulateError(testutil.ModeSync, storage.TypeManifest, nil) |
| |
| // Should not lose data |
| h.openDB() |
| h.getVal("foo", "bar") |
| }() |
| } |
| } |
| |
| func assertErr(t *testing.T, err error, wanterr bool) { |
| if err != nil { |
| if wanterr { |
| t.Log("AssertErr: got error (expected): ", err) |
| } else { |
| t.Error("AssertErr: got error: ", err) |
| } |
| } else if wanterr { |
| t.Error("AssertErr: expect error") |
| } |
| } |
| |
| func TestDB_ClosedIsClosed(t *testing.T) { |
| h := newDbHarness(t) |
| db := h.db |
| |
| var iter, iter2 iterator.Iterator |
| var snap *Snapshot |
| func() { |
| defer h.close() |
| |
| h.put("k", "v") |
| h.getVal("k", "v") |
| |
| iter = db.NewIterator(nil, h.ro) |
| iter.Seek([]byte("k")) |
| testKeyVal(t, iter, "k->v") |
| |
| var err error |
| snap, err = db.GetSnapshot() |
| if err != nil { |
| t.Fatal("GetSnapshot: got error: ", err) |
| } |
| |
| h.getValr(snap, "k", "v") |
| |
| iter2 = snap.NewIterator(nil, h.ro) |
| iter2.Seek([]byte("k")) |
| testKeyVal(t, iter2, "k->v") |
| |
| h.put("foo", "v2") |
| h.delete("foo") |
| |
| // closing DB |
| iter.Release() |
| iter2.Release() |
| }() |
| |
| assertErr(t, db.Put([]byte("x"), []byte("y"), h.wo), true) |
| _, err := db.Get([]byte("k"), h.ro) |
| assertErr(t, err, true) |
| |
| if iter.Valid() { |
| t.Errorf("iter.Valid should false") |
| } |
| assertErr(t, iter.Error(), false) |
| testKeyVal(t, iter, "->") |
| if iter.Seek([]byte("k")) { |
| t.Errorf("iter.Seek should false") |
| } |
| assertErr(t, iter.Error(), true) |
| |
| assertErr(t, iter2.Error(), false) |
| |
| _, err = snap.Get([]byte("k"), h.ro) |
| assertErr(t, err, true) |
| |
| _, err = db.GetSnapshot() |
| assertErr(t, err, true) |
| |
| iter3 := db.NewIterator(nil, h.ro) |
| assertErr(t, iter3.Error(), true) |
| |
| iter3 = snap.NewIterator(nil, h.ro) |
| assertErr(t, iter3.Error(), true) |
| |
| assertErr(t, db.Delete([]byte("k"), h.wo), true) |
| |
| _, err = db.GetProperty("leveldb.stats") |
| assertErr(t, err, true) |
| |
| _, err = db.SizeOf([]util.Range{{[]byte("a"), []byte("z")}}) |
| assertErr(t, err, true) |
| |
| assertErr(t, db.CompactRange(util.Range{}), true) |
| |
| assertErr(t, db.Close(), true) |
| } |
| |
| type numberComparer struct{} |
| |
| func (numberComparer) num(x []byte) (n int) { |
| fmt.Sscan(string(x[1:len(x)-1]), &n) |
| return |
| } |
| |
| func (numberComparer) Name() string { |
| return "test.NumberComparer" |
| } |
| |
| func (p numberComparer) Compare(a, b []byte) int { |
| return p.num(a) - p.num(b) |
| } |
| |
| func (numberComparer) Separator(dst, a, b []byte) []byte { return nil } |
| func (numberComparer) Successor(dst, b []byte) []byte { return nil } |
| |
| func TestDB_CustomComparer(t *testing.T) { |
| h := newDbHarnessWopt(t, &opt.Options{ |
| DisableLargeBatchTransaction: true, |
| Comparer: numberComparer{}, |
| WriteBuffer: 1000, |
| }) |
| defer h.close() |
| |
| h.put("[10]", "ten") |
| h.put("[0x14]", "twenty") |
| for i := 0; i < 2; i++ { |
| h.getVal("[10]", "ten") |
| h.getVal("[0xa]", "ten") |
| h.getVal("[20]", "twenty") |
| h.getVal("[0x14]", "twenty") |
| h.get("[15]", false) |
| h.get("[0xf]", false) |
| h.compactMem() |
| h.compactRange("[0]", "[9999]") |
| } |
| |
| for n := 0; n < 2; n++ { |
| for i := 0; i < 100; i++ { |
| v := fmt.Sprintf("[%d]", i*10) |
| h.put(v, v) |
| } |
| h.compactMem() |
| h.compactRange("[0]", "[1000000]") |
| } |
| } |
| |
| func TestDB_ManualCompaction(t *testing.T) { |
| h := newDbHarness(t) |
| defer h.close() |
| |
| h.db.memdbMaxLevel = 2 |
| |
| h.putMulti(3, "p", "q") |
| h.tablesPerLevel("1,1,1") |
| |
| // Compaction range falls before files |
| h.compactRange("", "c") |
| h.tablesPerLevel("1,1,1") |
| |
| // Compaction range falls after files |
| h.compactRange("r", "z") |
| h.tablesPerLevel("1,1,1") |
| |
| // Compaction range overlaps files |
| h.compactRange("p1", "p9") |
| h.tablesPerLevel("0,0,1") |
| |
| // Populate a different range |
| h.putMulti(3, "c", "e") |
| h.tablesPerLevel("1,1,2") |
| |
| // Compact just the new range |
| h.compactRange("b", "f") |
| h.tablesPerLevel("0,0,2") |
| |
| // Compact all |
| h.putMulti(1, "a", "z") |
| h.tablesPerLevel("0,1,2") |
| h.compactRange("", "") |
| h.tablesPerLevel("0,0,1") |
| } |
| |
| func TestDB_BloomFilter(t *testing.T) { |
| h := newDbHarnessWopt(t, &opt.Options{ |
| DisableLargeBatchTransaction: true, |
| DisableBlockCache: true, |
| Filter: filter.NewBloomFilter(10), |
| }) |
| defer h.close() |
| |
| key := func(i int) string { |
| return fmt.Sprintf("key%06d", i) |
| } |
| |
| const n = 10000 |
| |
| // Populate multiple layers |
| for i := 0; i < n; i++ { |
| h.put(key(i), key(i)) |
| } |
| h.compactMem() |
| h.compactRange("a", "z") |
| for i := 0; i < n; i += 100 { |
| h.put(key(i), key(i)) |
| } |
| h.compactMem() |
| |
| // Prevent auto compactions triggered by seeks |
| h.stor.Stall(testutil.ModeSync, storage.TypeTable) |
| |
| // Lookup present keys. Should rarely read from small sstable. |
| h.stor.ResetCounter(testutil.ModeRead, storage.TypeTable) |
| for i := 0; i < n; i++ { |
| h.getVal(key(i), key(i)) |
| } |
| cnt, _ := h.stor.Counter(testutil.ModeRead, storage.TypeTable) |
| t.Logf("lookup of %d present keys yield %d sstable I/O reads", n, cnt) |
| if min, max := n, n+2*n/100; cnt < min || cnt > max { |
| t.Errorf("num of sstable I/O reads of present keys not in range of %d - %d, got %d", min, max, cnt) |
| } |
| |
| // Lookup missing keys. Should rarely read from either sstable. |
| h.stor.ResetCounter(testutil.ModeRead, storage.TypeTable) |
| for i := 0; i < n; i++ { |
| h.get(key(i)+".missing", false) |
| } |
| cnt, _ = h.stor.Counter(testutil.ModeRead, storage.TypeTable) |
| t.Logf("lookup of %d missing keys yield %d sstable I/O reads", n, cnt) |
| if max := 3 * n / 100; cnt > max { |
| t.Errorf("num of sstable I/O reads of missing keys was more than %d, got %d", max, cnt) |
| } |
| |
| h.stor.Release(testutil.ModeSync, storage.TypeTable) |
| } |
| |
| func TestDB_Concurrent(t *testing.T) { |
| const n, secs, maxkey = 4, 2, 1000 |
| |
| runtime.GOMAXPROCS(n) |
| trun(t, func(h *dbHarness) { |
| var closeWg sync.WaitGroup |
| var stop uint32 |
| var cnt [n]uint32 |
| |
| for i := 0; i < n; i++ { |
| closeWg.Add(1) |
| go func(i int) { |
| var put, get, found uint |
| defer func() { |
| t.Logf("goroutine %d stopped after %d ops, put=%d get=%d found=%d missing=%d", |
| i, cnt[i], put, get, found, get-found) |
| closeWg.Done() |
| }() |
| |
| rnd := rand.New(rand.NewSource(int64(1000 + i))) |
| for atomic.LoadUint32(&stop) == 0 { |
| x := cnt[i] |
| |
| k := rnd.Intn(maxkey) |
| kstr := fmt.Sprintf("%016d", k) |
| |
| if (rnd.Int() % 2) > 0 { |
| put++ |
| h.put(kstr, fmt.Sprintf("%d.%d.%-1000d", k, i, x)) |
| } else { |
| get++ |
| v, err := h.db.Get([]byte(kstr), h.ro) |
| if err == nil { |
| found++ |
| rk, ri, rx := 0, -1, uint32(0) |
| fmt.Sscanf(string(v), "%d.%d.%d", &rk, &ri, &rx) |
| if rk != k { |
| t.Errorf("invalid key want=%d got=%d", k, rk) |
| } |
| if ri < 0 || ri >= n { |
| t.Error("invalid goroutine number: ", ri) |
| } else { |
| tx := atomic.LoadUint32(&(cnt[ri])) |
| if rx > tx { |
| t.Errorf("invalid seq number, %d > %d ", rx, tx) |
| } |
| } |
| } else if err != ErrNotFound { |
| t.Error("Get: got error: ", err) |
| return |
| } |
| } |
| atomic.AddUint32(&cnt[i], 1) |
| } |
| }(i) |
| } |
| |
| time.Sleep(secs * time.Second) |
| atomic.StoreUint32(&stop, 1) |
| closeWg.Wait() |
| }) |
| |
| runtime.GOMAXPROCS(1) |
| } |
| |
| func TestDB_Concurrent2(t *testing.T) { |
| const n, n2 = 4, 4000 |
| |
| runtime.GOMAXPROCS(n*2 + 2) |
| truno(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 30}, func(h *dbHarness) { |
| var closeWg sync.WaitGroup |
| var stop uint32 |
| |
| for i := 0; i < n; i++ { |
| closeWg.Add(1) |
| go func(i int) { |
| for k := 0; atomic.LoadUint32(&stop) == 0; k++ { |
| h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10)) |
| } |
| closeWg.Done() |
| }(i) |
| } |
| |
| for i := 0; i < n; i++ { |
| closeWg.Add(1) |
| go func(i int) { |
| for k := 1000000; k < 0 || atomic.LoadUint32(&stop) == 0; k-- { |
| h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10)) |
| } |
| closeWg.Done() |
| }(i) |
| } |
| |
| cmp := comparer.DefaultComparer |
| for i := 0; i < n2; i++ { |
| closeWg.Add(1) |
| go func(i int) { |
| it := h.db.NewIterator(nil, nil) |
| var pk []byte |
| for it.Next() { |
| kk := it.Key() |
| if cmp.Compare(kk, pk) <= 0 { |
| t.Errorf("iter %d: %q is successor of %q", i, pk, kk) |
| } |
| pk = append(pk[:0], kk...) |
| var k, vk, vi int |
| if n, err := fmt.Sscanf(string(it.Key()), "k%d", &k); err != nil { |
| t.Errorf("iter %d: Scanf error on key %q: %v", i, it.Key(), err) |
| } else if n < 1 { |
| t.Errorf("iter %d: Cannot parse key %q", i, it.Key()) |
| } |
| if n, err := fmt.Sscanf(string(it.Value()), "%d.%d", &vk, &vi); err != nil { |
| t.Errorf("iter %d: Scanf error on value %q: %v", i, it.Value(), err) |
| } else if n < 2 { |
| t.Errorf("iter %d: Cannot parse value %q", i, it.Value()) |
| } |
| |
| if vk != k { |
| t.Errorf("iter %d: invalid value i=%d, want=%d got=%d", i, vi, k, vk) |
| } |
| } |
| if err := it.Error(); err != nil { |
| t.Errorf("iter %d: Got error: %v", i, err) |
| } |
| it.Release() |
| closeWg.Done() |
| }(i) |
| } |
| |
| atomic.StoreUint32(&stop, 1) |
| closeWg.Wait() |
| }) |
| |
| runtime.GOMAXPROCS(1) |
| } |
| |
| func TestDB_CreateReopenDbOnFile(t *testing.T) { |
| dbpath := filepath.Join(os.TempDir(), fmt.Sprintf("goleveldbtestCreateReopenDbOnFile-%d", os.Getuid())) |
| if err := os.RemoveAll(dbpath); err != nil { |
| t.Fatal("cannot remove old db: ", err) |
| } |
| defer os.RemoveAll(dbpath) |
| |
| for i := 0; i < 3; i++ { |
| stor, err := storage.OpenFile(dbpath, false) |
| if err != nil { |
| t.Fatalf("(%d) cannot open storage: %s", i, err) |
| } |
| db, err := Open(stor, nil) |
| if err != nil { |
| t.Fatalf("(%d) cannot open db: %s", i, err) |
| } |
| if err := db.Put([]byte("foo"), []byte("bar"), nil); err != nil { |
| t.Fatalf("(%d) cannot write to db: %s", i, err) |
| } |
| if err := db.Close(); err != nil { |
| t.Fatalf("(%d) cannot close db: %s", i, err) |
| } |
| if err := stor.Close(); err != nil { |
| t.Fatalf("(%d) cannot close storage: %s", i, err) |
| } |
| } |
| } |
| |
| func TestDB_CreateReopenDbOnFile2(t *testing.T) { |
| dbpath := filepath.Join(os.TempDir(), fmt.Sprintf("goleveldbtestCreateReopenDbOnFile2-%d", os.Getuid())) |
| if err := os.RemoveAll(dbpath); err != nil { |
| t.Fatal("cannot remove old db: ", err) |
| } |
| defer os.RemoveAll(dbpath) |
| |
| for i := 0; i < 3; i++ { |
| db, err := OpenFile(dbpath, nil) |
| if err != nil { |
| t.Fatalf("(%d) cannot open db: %s", i, err) |
| } |
| if err := db.Put([]byte("foo"), []byte("bar"), nil); err != nil { |
| t.Fatalf("(%d) cannot write to db: %s", i, err) |
| } |
| if err := db.Close(); err != nil { |
| t.Fatalf("(%d) cannot close db: %s", i, err) |
| } |
| } |
| } |
| |
| func TestDB_DeletionMarkersOnMemdb(t *testing.T) { |
| h := newDbHarness(t) |
| defer h.close() |
| |
| h.put("foo", "v1") |
| h.compactMem() |
| h.delete("foo") |
| h.get("foo", false) |
| h.getKeyVal("") |
| } |
| |
| func TestDB_LeveldbIssue178(t *testing.T) { |
| nKeys := (opt.DefaultCompactionTableSize / 30) * 5 |
| key1 := func(i int) string { |
| return fmt.Sprintf("my_key_%d", i) |
| } |
| key2 := func(i int) string { |
| return fmt.Sprintf("my_key_%d_xxx", i) |
| } |
| |
| // Disable compression since it affects the creation of layers and the |
| // code below is trying to test against a very specific scenario. |
| h := newDbHarnessWopt(t, &opt.Options{ |
| DisableLargeBatchTransaction: true, |
| Compression: opt.NoCompression, |
| }) |
| defer h.close() |
| |
| // Create first key range. |
| batch := new(Batch) |
| for i := 0; i < nKeys; i++ { |
| batch.Put([]byte(key1(i)), []byte("value for range 1 key")) |
| } |
| h.write(batch) |
| |
| // Create second key range. |
| batch.Reset() |
| for i := 0; i < nKeys; i++ { |
| batch.Put([]byte(key2(i)), []byte("value for range 2 key")) |
| } |
| h.write(batch) |
| |
| // Delete second key range. |
| batch.Reset() |
| for i := 0; i < nKeys; i++ { |
| batch.Delete([]byte(key2(i))) |
| } |
| h.write(batch) |
| h.waitMemCompaction() |
| |
| // Run manual compaction. |
| h.compactRange(key1(0), key1(nKeys-1)) |
| |
| // Checking the keys. |
| h.assertNumKeys(nKeys) |
| } |
| |
| func TestDB_LeveldbIssue200(t *testing.T) { |
| h := newDbHarness(t) |
| defer h.close() |
| |
| h.put("1", "b") |
| h.put("2", "c") |
| h.put("3", "d") |
| h.put("4", "e") |
| h.put("5", "f") |
| |
| iter := h.db.NewIterator(nil, h.ro) |
| |
| // Add an element that should not be reflected in the iterator. |
| h.put("25", "cd") |
| |
| iter.Seek([]byte("5")) |
| assertBytes(t, []byte("5"), iter.Key()) |
| iter.Prev() |
| assertBytes(t, []byte("4"), iter.Key()) |
| iter.Prev() |
| assertBytes(t, []byte("3"), iter.Key()) |
| iter.Next() |
| assertBytes(t, []byte("4"), iter.Key()) |
| iter.Next() |
| assertBytes(t, []byte("5"), iter.Key()) |
| } |
| |
| func TestDB_GoleveldbIssue74(t *testing.T) { |
| h := newDbHarnessWopt(t, &opt.Options{ |
| DisableLargeBatchTransaction: true, |
| WriteBuffer: 1 * opt.MiB, |
| }) |
| defer h.close() |
| |
| const n, dur = 10000, 5 * time.Second |
| |
| runtime.GOMAXPROCS(runtime.NumCPU()) |
| |
| until := time.Now().Add(dur) |
| wg := new(sync.WaitGroup) |
| wg.Add(2) |
| var done uint32 |
| go func() { |
| var i int |
| defer func() { |
| t.Logf("WRITER DONE #%d", i) |
| atomic.StoreUint32(&done, 1) |
| wg.Done() |
| }() |
| |
| b := new(Batch) |
| for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ { |
| iv := fmt.Sprintf("VAL%010d", i) |
| for k := 0; k < n; k++ { |
| key := fmt.Sprintf("KEY%06d", k) |
| b.Put([]byte(key), []byte(key+iv)) |
| b.Put([]byte(fmt.Sprintf("PTR%06d", k)), []byte(key)) |
| } |
| h.write(b) |
| |
| b.Reset() |
| snap := h.getSnapshot() |
| iter := snap.NewIterator(util.BytesPrefix([]byte("PTR")), nil) |
| var k int |
| for ; iter.Next(); k++ { |
| ptrKey := iter.Key() |
| key := iter.Value() |
| |
| if _, err := snap.Get(ptrKey, nil); err != nil { |
| t.Fatalf("WRITER #%d snapshot.Get %q: %v", i, ptrKey, err) |
| } |
| if value, err := snap.Get(key, nil); err != nil { |
| t.Fatalf("WRITER #%d snapshot.Get %q: %v", i, key, err) |
| } else if string(value) != string(key)+iv { |
| t.Fatalf("WRITER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+iv, value) |
| } |
| |
| b.Delete(key) |
| b.Delete(ptrKey) |
| } |
| h.write(b) |
| iter.Release() |
| snap.Release() |
| if k != n { |
| t.Fatalf("#%d %d != %d", i, k, n) |
| } |
| } |
| }() |
| go func() { |
| var i int |
| defer func() { |
| t.Logf("READER DONE #%d", i) |
| atomic.StoreUint32(&done, 1) |
| wg.Done() |
| }() |
| for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ { |
| snap := h.getSnapshot() |
| iter := snap.NewIterator(util.BytesPrefix([]byte("PTR")), nil) |
| var prevValue string |
| var k int |
| for ; iter.Next(); k++ { |
| ptrKey := iter.Key() |
| key := iter.Value() |
| |
| if _, err := snap.Get(ptrKey, nil); err != nil { |
| t.Fatalf("READER #%d snapshot.Get %q: %v", i, ptrKey, err) |
| } |
| |
| if value, err := snap.Get(key, nil); err != nil { |
| t.Fatalf("READER #%d snapshot.Get %q: %v", i, key, err) |
| } else if prevValue != "" && string(value) != string(key)+prevValue { |
| t.Fatalf("READER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+prevValue, value) |
| } else { |
| prevValue = string(value[len(key):]) |
| } |
| } |
| iter.Release() |
| snap.Release() |
| if k > 0 && k != n { |
| t.Fatalf("#%d %d != %d", i, k, n) |
| } |
| } |
| }() |
| wg.Wait() |
| } |
| |
| func TestDB_GetProperties(t *testing.T) { |
| h := newDbHarness(t) |
| defer h.close() |
| |
| _, err := h.db.GetProperty("leveldb.num-files-at-level") |
| if err == nil { |
| t.Error("GetProperty() failed to detect missing level") |
| } |
| |
| _, err = h.db.GetProperty("leveldb.num-files-at-level0") |
| if err != nil { |
| t.Error("got unexpected error", err) |
| } |
| |
| _, err = h.db.GetProperty("leveldb.num-files-at-level0x") |
| if err == nil { |
| t.Error("GetProperty() failed to detect invalid level") |
| } |
| } |
| |
| func TestDB_GoleveldbIssue72and83(t *testing.T) { |
| h := newDbHarnessWopt(t, &opt.Options{ |
| DisableLargeBatchTransaction: true, |
| WriteBuffer: 1 * opt.MiB, |
| OpenFilesCacheCapacity: 3, |
| }) |
| defer h.close() |
| |
| const n, wn, dur = 10000, 100, 30 * time.Second |
| |
| runtime.GOMAXPROCS(runtime.NumCPU()) |
| |
| randomData := func(prefix byte, i int) []byte { |
| data := make([]byte, 1+4+32+64+32) |
| _, err := crand.Reader.Read(data[1 : len(data)-8]) |
| if err != nil { |
| panic(err) |
| } |
| data[0] = prefix |
| binary.LittleEndian.PutUint32(data[len(data)-8:], uint32(i)) |
| binary.LittleEndian.PutUint32(data[len(data)-4:], util.NewCRC(data[:len(data)-4]).Value()) |
| return data |
| } |
| |
| keys := make([][]byte, n) |
| for i := range keys { |
| keys[i] = randomData(1, 0) |
| } |
| |
| until := time.Now().Add(dur) |
| wg := new(sync.WaitGroup) |
| wg.Add(3) |
| var done uint32 |
| go func() { |
| i := 0 |
| defer func() { |
| t.Logf("WRITER DONE #%d", i) |
| wg.Done() |
| }() |
| |
| b := new(Batch) |
| for ; i < wn && atomic.LoadUint32(&done) == 0; i++ { |
| b.Reset() |
| for _, k1 := range keys { |
| k2 := randomData(2, i) |
| b.Put(k2, randomData(42, i)) |
| b.Put(k1, k2) |
| } |
| if err := h.db.Write(b, h.wo); err != nil { |
| atomic.StoreUint32(&done, 1) |
| t.Fatalf("WRITER #%d db.Write: %v", i, err) |
| } |
| } |
| }() |
| go func() { |
| var i int |
| defer func() { |
| t.Logf("READER0 DONE #%d", i) |
| atomic.StoreUint32(&done, 1) |
| wg.Done() |
| }() |
| for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ { |
| snap := h.getSnapshot() |
| seq := snap.elem.seq |
| if seq == 0 { |
| snap.Release() |
| continue |
| } |
| iter := snap.NewIterator(util.BytesPrefix([]byte{1}), nil) |
| writei := int(seq/(n*2) - 1) |
| var k int |
| for ; iter.Next(); k++ { |
| k1 := iter.Key() |
| k2 := iter.Value() |
| k1checksum0 := binary.LittleEndian.Uint32(k1[len(k1)-4:]) |
| k1checksum1 := util.NewCRC(k1[:len(k1)-4]).Value() |
| if k1checksum0 != k1checksum1 { |
| t.Fatalf("READER0 #%d.%d W#%d invalid K1 checksum: %#x != %#x", i, k, k1checksum0, k1checksum0) |
| } |
| k2checksum0 := binary.LittleEndian.Uint32(k2[len(k2)-4:]) |
| k2checksum1 := util.NewCRC(k2[:len(k2)-4]).Value() |
| if k2checksum0 != k2checksum1 { |
| t.Fatalf("READER0 #%d.%d W#%d invalid K2 checksum: %#x != %#x", i, k, k2checksum0, k2checksum1) |
| } |
| kwritei := int(binary.LittleEndian.Uint32(k2[len(k2)-8:])) |
| if writei != kwritei { |
| t.Fatalf("READER0 #%d.%d W#%d invalid write iteration num: %d", i, k, writei, kwritei) |
| } |
| if _, err := snap.Get(k2, nil); err != nil { |
| t.Fatalf("READER0 #%d.%d W#%d snap.Get: %v\nk1: %x\n -> k2: %x", i, k, writei, err, k1, k2) |
| } |
| } |
| if err := iter.Error(); err != nil { |
| t.Fatalf("READER0 #%d.%d W#%d snap.Iterator: %v", i, k, writei, err) |
| } |
| iter.Release() |
| snap.Release() |
| if k > 0 && k != n { |
| t.Fatalf("READER0 #%d W#%d short read, got=%d want=%d", i, writei, k, n) |
| } |
| } |
| }() |
| go func() { |
| var i int |
| defer func() { |
| t.Logf("READER1 DONE #%d", i) |
| atomic.StoreUint32(&done, 1) |
| wg.Done() |
| }() |
| for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ { |
| iter := h.db.NewIterator(nil, nil) |
| seq := iter.(*dbIter).seq |
| if seq == 0 { |
| iter.Release() |
| continue |
| } |
| writei := int(seq/(n*2) - 1) |
| var k int |
| for ok := iter.Last(); ok; ok = iter.Prev() { |
| k++ |
| } |
| if err := iter.Error(); err != nil { |
| t.Fatalf("READER1 #%d.%d W#%d db.Iterator: %v", i, k, writei, err) |
| } |
| iter.Release() |
| if m := (writei+1)*n + n; k != m { |
| t.Fatalf("READER1 #%d W#%d short read, got=%d want=%d", i, writei, k, m) |
| } |
| } |
| }() |
| |
| wg.Wait() |
| } |
| |
| func TestDB_TransientError(t *testing.T) { |
| h := newDbHarnessWopt(t, &opt.Options{ |
| DisableLargeBatchTransaction: true, |
| WriteBuffer: 128 * opt.KiB, |
| OpenFilesCacheCapacity: 3, |
| DisableCompactionBackoff: true, |
| }) |
| defer h.close() |
| |
| const ( |
| nSnap = 20 |
| nKey = 10000 |
| ) |
| |
| var ( |
| snaps [nSnap]*Snapshot |
| b = &Batch{} |
| ) |
| for i := range snaps { |
| vtail := fmt.Sprintf("VAL%030d", i) |
| b.Reset() |
| for k := 0; k < nKey; k++ { |
| key := fmt.Sprintf("KEY%8d", k) |
| b.Put([]byte(key), []byte(key+vtail)) |
| } |
| h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, errors.New("table transient read error")) |
| if err := h.db.Write(b, nil); err != nil { |
| t.Logf("WRITE #%d error: %v", i, err) |
| h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil) |
| for { |
| if err := h.db.Write(b, nil); err == nil { |
| break |
| } else if errors.IsCorrupted(err) { |
| t.Fatalf("WRITE #%d corrupted: %v", i, err) |
| } |
| } |
| } |
| |
| snaps[i] = h.db.newSnapshot() |
| b.Reset() |
| for k := 0; k < nKey; k++ { |
| key := fmt.Sprintf("KEY%8d", k) |
| b.Delete([]byte(key)) |
| } |
| h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, errors.New("table transient read error")) |
| if err := h.db.Write(b, nil); err != nil { |
| t.Logf("WRITE #%d error: %v", i, err) |
| h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil) |
| for { |
| if err := h.db.Write(b, nil); err == nil { |
| break |
| } else if errors.IsCorrupted(err) { |
| t.Fatalf("WRITE #%d corrupted: %v", i, err) |
| } |
| } |
| } |
| } |
| h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil) |
| |
| runtime.GOMAXPROCS(runtime.NumCPU()) |
| |
| rnd := rand.New(rand.NewSource(0xecafdaed)) |
| wg := &sync.WaitGroup{} |
| for i, snap := range snaps { |
| wg.Add(2) |
| |
| go func(i int, snap *Snapshot, sk []int) { |
| defer wg.Done() |
| |
| vtail := fmt.Sprintf("VAL%030d", i) |
| for _, k := range sk { |
| key := fmt.Sprintf("KEY%8d", k) |
| xvalue, err := snap.Get([]byte(key), nil) |
| if err != nil { |
| t.Fatalf("READER_GET #%d SEQ=%d K%d error: %v", i, snap.elem.seq, k, err) |
| } |
| value := key + vtail |
| if !bytes.Equal([]byte(value), xvalue) { |
| t.Fatalf("READER_GET #%d SEQ=%d K%d invalid value: want %q, got %q", i, snap.elem.seq, k, value, xvalue) |
| } |
| } |
| }(i, snap, rnd.Perm(nKey)) |
| |
| go func(i int, snap *Snapshot) { |
| defer wg.Done() |
| |
| vtail := fmt.Sprintf("VAL%030d", i) |
| iter := snap.NewIterator(nil, nil) |
| defer iter.Release() |
| for k := 0; k < nKey; k++ { |
| if !iter.Next() { |
| if err := iter.Error(); err != nil { |
| t.Fatalf("READER_ITER #%d K%d error: %v", i, k, err) |
| } else { |
| t.Fatalf("READER_ITER #%d K%d eoi", i, k) |
| } |
| } |
| key := fmt.Sprintf("KEY%8d", k) |
| xkey := iter.Key() |
| if !bytes.Equal([]byte(key), xkey) { |
| t.Fatalf("READER_ITER #%d K%d invalid key: want %q, got %q", i, k, key, xkey) |
| } |
| value := key + vtail |
| xvalue := iter.Value() |
| if !bytes.Equal([]byte(value), xvalue) { |
| t.Fatalf("READER_ITER #%d K%d invalid value: want %q, got %q", i, k, value, xvalue) |
| } |
| } |
| }(i, snap) |
| } |
| |
| wg.Wait() |
| } |
| |
| func TestDB_UkeyShouldntHopAcrossTable(t *testing.T) { |
| h := newDbHarnessWopt(t, &opt.Options{ |
| DisableLargeBatchTransaction: true, |
| WriteBuffer: 112 * opt.KiB, |
| CompactionTableSize: 90 * opt.KiB, |
| CompactionExpandLimitFactor: 1, |
| }) |
| defer h.close() |
| |
| const ( |
| nSnap = 190 |
| nKey = 140 |
| ) |
| |
| var ( |
| snaps [nSnap]*Snapshot |
| b = &Batch{} |
| ) |
| for i := range snaps { |
| vtail := fmt.Sprintf("VAL%030d", i) |
| b.Reset() |
| for k := 0; k < nKey; k++ { |
| key := fmt.Sprintf("KEY%08d", k) |
| b.Put([]byte(key), []byte(key+vtail)) |
| } |
| if err := h.db.Write(b, nil); err != nil { |
| t.Fatalf("WRITE #%d error: %v", i, err) |
| } |
| |
| snaps[i] = h.db.newSnapshot() |
| b.Reset() |
| for k := 0; k < nKey; k++ { |
| key := fmt.Sprintf("KEY%08d", k) |
| b.Delete([]byte(key)) |
| } |
| if err := h.db.Write(b, nil); err != nil { |
| t.Fatalf("WRITE #%d error: %v", i, err) |
| } |
| } |
| |
| h.compactMem() |
| |
| h.waitCompaction() |
| for level, tables := range h.db.s.stVersion.levels { |
| for _, table := range tables { |
| t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax) |
| } |
| } |
| |
| h.compactRangeAt(0, "", "") |
| h.waitCompaction() |
| for level, tables := range h.db.s.stVersion.levels { |
| for _, table := range tables { |
| t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax) |
| } |
| } |
| h.compactRangeAt(1, "", "") |
| h.waitCompaction() |
| for level, tables := range h.db.s.stVersion.levels { |
| for _, table := range tables { |
| t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax) |
| } |
| } |
| runtime.GOMAXPROCS(runtime.NumCPU()) |
| |
| wg := &sync.WaitGroup{} |
| for i, snap := range snaps { |
| wg.Add(1) |
| |
| go func(i int, snap *Snapshot) { |
| defer wg.Done() |
| |
| vtail := fmt.Sprintf("VAL%030d", i) |
| for k := 0; k < nKey; k++ { |
| key := fmt.Sprintf("KEY%08d", k) |
| xvalue, err := snap.Get([]byte(key), nil) |
| if err != nil { |
| t.Fatalf("READER_GET #%d SEQ=%d K%d error: %v", i, snap.elem.seq, k, err) |
| } |
| value := key + vtail |
| if !bytes.Equal([]byte(value), xvalue) { |
| t.Fatalf("READER_GET #%d SEQ=%d K%d invalid value: want %q, got %q", i, snap.elem.seq, k, value, xvalue) |
| } |
| } |
| }(i, snap) |
| } |
| |
| wg.Wait() |
| } |
| |
| func TestDB_TableCompactionBuilder(t *testing.T) { |
| gomega.RegisterTestingT(t) |
| stor := testutil.NewStorage() |
| stor.OnLog(testingLogger(t)) |
| stor.OnClose(testingPreserveOnFailed(t)) |
| defer stor.Close() |
| |
| const nSeq = 99 |
| |
| o := &opt.Options{ |
| DisableLargeBatchTransaction: true, |
| WriteBuffer: 112 * opt.KiB, |
| CompactionTableSize: 43 * opt.KiB, |
| CompactionExpandLimitFactor: 1, |
| CompactionGPOverlapsFactor: 1, |
| DisableBlockCache: true, |
| } |
| s, err := newSession(stor, o) |
| if err != nil { |
| t.Fatal(err) |
| } |
| if err := s.create(); err != nil { |
| t.Fatal(err) |
| } |
| defer s.close() |
| var ( |
| seq uint64 |
| targetSize = 5 * o.CompactionTableSize |
| value = bytes.Repeat([]byte{'0'}, 100) |
| ) |
| for i := 0; i < 2; i++ { |
| tw, err := s.tops.create() |
| if err != nil { |
| t.Fatal(err) |
| } |
| for k := 0; tw.tw.BytesLen() < targetSize; k++ { |
| key := []byte(fmt.Sprintf("%09d", k)) |
| seq += nSeq - 1 |
| for x := uint64(0); x < nSeq; x++ { |
| if err := tw.append(makeInternalKey(nil, key, seq-x, keyTypeVal), value); err != nil { |
| t.Fatal(err) |
| } |
| } |
| } |
| tf, err := tw.finish() |
| if err != nil { |
| t.Fatal(err) |
| } |
| rec := &sessionRecord{} |
| rec.addTableFile(i, tf) |
| if err := s.commit(rec); err != nil { |
| t.Fatal(err) |
| } |
| } |
| |
| // Build grandparent. |
| v := s.version() |
| c := newCompaction(s, v, 1, append(tFiles{}, v.levels[1]...)) |
| rec := &sessionRecord{} |
| b := &tableCompactionBuilder{ |
| s: s, |
| c: c, |
| rec: rec, |
| stat1: new(cStatStaging), |
| minSeq: 0, |
| strict: true, |
| tableSize: o.CompactionTableSize/3 + 961, |
| } |
| if err := b.run(new(compactionTransactCounter)); err != nil { |
| t.Fatal(err) |
| } |
| for _, t := range c.levels[0] { |
| rec.delTable(c.sourceLevel, t.fd.Num) |
| } |
| if err := s.commit(rec); err != nil { |
| t.Fatal(err) |
| } |
| c.release() |
| |
| // Build level-1. |
| v = s.version() |
| c = newCompaction(s, v, 0, append(tFiles{}, v.levels[0]...)) |
| rec = &sessionRecord{} |
| b = &tableCompactionBuilder{ |
| s: s, |
| c: c, |
| rec: rec, |
| stat1: new(cStatStaging), |
| minSeq: 0, |
| strict: true, |
| tableSize: o.CompactionTableSize, |
| } |
| if err := b.run(new(compactionTransactCounter)); err != nil { |
| t.Fatal(err) |
| } |
| for _, t := range c.levels[0] { |
| rec.delTable(c.sourceLevel, t.fd.Num) |
| } |
| // Move grandparent to level-3 |
| for _, t := range v.levels[2] { |
| rec.delTable(2, t.fd.Num) |
| rec.addTableFile(3, t) |
| } |
| if err := s.commit(rec); err != nil { |
| t.Fatal(err) |
| } |
| c.release() |
| |
| v = s.version() |
| for level, want := range []bool{false, true, false, true} { |
| got := len(v.levels[level]) > 0 |
| if want != got { |
| t.Fatalf("invalid level-%d tables len: want %v, got %v", level, want, got) |
| } |
| } |
| for i, f := range v.levels[1][:len(v.levels[1])-1] { |
| nf := v.levels[1][i+1] |
| if bytes.Equal(f.imax.ukey(), nf.imin.ukey()) { |
| t.Fatalf("KEY %q hop across table %d .. %d", f.imax.ukey(), f.fd.Num, nf.fd.Num) |
| } |
| } |
| v.release() |
| |
| // Compaction with transient error. |
| v = s.version() |
| c = newCompaction(s, v, 1, append(tFiles{}, v.levels[1]...)) |
| rec = &sessionRecord{} |
| b = &tableCompactionBuilder{ |
| s: s, |
| c: c, |
| rec: rec, |
| stat1: new(cStatStaging), |
| minSeq: 0, |
| strict: true, |
| tableSize: o.CompactionTableSize, |
| } |
| stor.EmulateErrorOnce(testutil.ModeSync, storage.TypeTable, errors.New("table sync error (once)")) |
| stor.EmulateRandomError(testutil.ModeRead|testutil.ModeWrite, storage.TypeTable, 0.01, errors.New("table random IO error")) |
| for { |
| if err := b.run(new(compactionTransactCounter)); err != nil { |
| t.Logf("(expected) b.run: %v", err) |
| } else { |
| break |
| } |
| } |
| if err := s.commit(rec); err != nil { |
| t.Fatal(err) |
| } |
| c.release() |
| |
| stor.EmulateErrorOnce(testutil.ModeSync, storage.TypeTable, nil) |
| stor.EmulateRandomError(testutil.ModeRead|testutil.ModeWrite, storage.TypeTable, 0, nil) |
| |
| v = s.version() |
| if len(v.levels[1]) != len(v.levels[2]) { |
| t.Fatalf("invalid tables length, want %d, got %d", len(v.levels[1]), len(v.levels[2])) |
| } |
| for i, f0 := range v.levels[1] { |
| f1 := v.levels[2][i] |
| iter0 := s.tops.newIterator(f0, nil, nil) |
| iter1 := s.tops.newIterator(f1, nil, nil) |
| for j := 0; true; j++ { |
| next0 := iter0.Next() |
| next1 := iter1.Next() |
| if next0 != next1 { |
| t.Fatalf("#%d.%d invalid eoi: want %v, got %v", i, j, next0, next1) |
| } |
| key0 := iter0.Key() |
| key1 := iter1.Key() |
| if !bytes.Equal(key0, key1) { |
| t.Fatalf("#%d.%d invalid key: want %q, got %q", i, j, key0, key1) |
| } |
| if next0 == false { |
| break |
| } |
| } |
| iter0.Release() |
| iter1.Release() |
| } |
| v.release() |
| } |
| |
| func testDB_IterTriggeredCompaction(t *testing.T, limitDiv int) { |
| const ( |
| vSize = 200 * opt.KiB |
| tSize = 100 * opt.MiB |
| mIter = 100 |
| n = tSize / vSize |
| ) |
| |
| h := newDbHarnessWopt(t, &opt.Options{ |
| DisableLargeBatchTransaction: true, |
| Compression: opt.NoCompression, |
| DisableBlockCache: true, |
| }) |
| defer h.close() |
| |
| h.db.memdbMaxLevel = 2 |
| |
| key := func(x int) string { |
| return fmt.Sprintf("v%06d", x) |
| } |
| |
| // Fill. |
| value := strings.Repeat("x", vSize) |
| for i := 0; i < n; i++ { |
| h.put(key(i), value) |
| } |
| h.compactMem() |
| |
| // Delete all. |
| for i := 0; i < n; i++ { |
| h.delete(key(i)) |
| } |
| h.compactMem() |
| |
| var ( |
| limit = n / limitDiv |
| |
| startKey = key(0) |
| limitKey = key(limit) |
| maxKey = key(n) |
| slice = &util.Range{Limit: []byte(limitKey)} |
| |
| initialSize0 = h.sizeOf(startKey, limitKey) |
| initialSize1 = h.sizeOf(limitKey, maxKey) |
| ) |
| |
| t.Logf("inital size %s [rest %s]", shortenb(int(initialSize0)), shortenb(int(initialSize1))) |
| |
| for r := 0; true; r++ { |
| if r >= mIter { |
| t.Fatal("taking too long to compact") |
| } |
| |
| // Iterates. |
| iter := h.db.NewIterator(slice, h.ro) |
| for iter.Next() { |
| } |
| if err := iter.Error(); err != nil { |
| t.Fatalf("Iter err: %v", err) |
| } |
| iter.Release() |
| |
| // Wait compaction. |
| h.waitCompaction() |
| |
| // Check size. |
| size0 := h.sizeOf(startKey, limitKey) |
| size1 := h.sizeOf(limitKey, maxKey) |
| t.Logf("#%03d size %s [rest %s]", r, shortenb(int(size0)), shortenb(int(size1))) |
| if size0 < initialSize0/10 { |
| break |
| } |
| } |
| |
| if initialSize1 > 0 { |
| h.sizeAssert(limitKey, maxKey, initialSize1/4-opt.MiB, initialSize1+opt.MiB) |
| } |
| } |
| |
| func TestDB_IterTriggeredCompaction(t *testing.T) { |
| testDB_IterTriggeredCompaction(t, 1) |
| } |
| |
| func TestDB_IterTriggeredCompactionHalf(t *testing.T) { |
| testDB_IterTriggeredCompaction(t, 2) |
| } |
| |
| func TestDB_ReadOnly(t *testing.T) { |
| h := newDbHarness(t) |
| defer h.close() |
| |
| h.put("foo", "v1") |
| h.put("bar", "v2") |
| h.compactMem() |
| |
| h.put("xfoo", "v1") |
| h.put("xbar", "v2") |
| |
| t.Log("Trigger read-only") |
| if err := h.db.SetReadOnly(); err != nil { |
| h.close() |
| t.Fatalf("SetReadOnly error: %v", err) |
| } |
| |
| mode := testutil.ModeCreate | testutil.ModeRemove | testutil.ModeRename | testutil.ModeWrite | testutil.ModeSync |
| h.stor.EmulateError(mode, storage.TypeAll, errors.New("read-only DB shouldn't writes")) |
| |
| ro := func(key, value, wantValue string) { |
| if err := h.db.Put([]byte(key), []byte(value), h.wo); err != ErrReadOnly { |
| t.Fatalf("unexpected error: %v", err) |
| } |
| h.getVal(key, wantValue) |
| } |
| |
| ro("foo", "vx", "v1") |
| |
| h.o.ReadOnly = true |
| h.reopenDB() |
| |
| ro("foo", "vx", "v1") |
| ro("bar", "vx", "v2") |
| h.assertNumKeys(4) |
| } |
| |
| func TestDB_BulkInsertDelete(t *testing.T) { |
| h := newDbHarnessWopt(t, &opt.Options{ |
| DisableLargeBatchTransaction: true, |
| Compression: opt.NoCompression, |
| CompactionTableSize: 128 * opt.KiB, |
| CompactionTotalSize: 1 * opt.MiB, |
| WriteBuffer: 256 * opt.KiB, |
| }) |
| defer h.close() |
| |
| const R = 100 |
| const N = 2500 |
| key := make([]byte, 4) |
| value := make([]byte, 256) |
| for i := 0; i < R; i++ { |
| offset := N * i |
| for j := 0; j < N; j++ { |
| binary.BigEndian.PutUint32(key, uint32(offset+j)) |
| h.db.Put(key, value, nil) |
| } |
| for j := 0; j < N; j++ { |
| binary.BigEndian.PutUint32(key, uint32(offset+j)) |
| h.db.Delete(key, nil) |
| } |
| } |
| |
| if tot := h.totalTables(); tot > 10 { |
| t.Fatalf("too many uncompacted tables: %d (%s)", tot, h.getTablesPerLevel()) |
| } |
| } |