leveldb: add concurrent write testing #136
diff --git a/.travis.yml b/.travis.yml
index 8288a4c..1428e77 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -6,4 +6,6 @@
- 1.4
- tip
-script: go test -timeout 1h ./...
+script:
+ - go test -timeout 1h ./...
+ - go test -timeout 30m -race -run "TestDB_(Concurrent|GoleveldbIssue74)" ./leveldb
diff --git a/leveldb/db_test.go b/leveldb/db_test.go
index 9be0487..37d08eb 100644
--- a/leveldb/db_test.go
+++ b/leveldb/db_test.go
@@ -1740,139 +1740,165 @@
}
func TestDB_Concurrent(t *testing.T) {
- const n, secs, maxkey = 4, 2, 1000
+ const n, secs, maxkey = 4, 6, 1000
+ h := newDbHarness(t)
+ defer h.close()
- runtime.GOMAXPROCS(n)
- trun(t, func(h *dbHarness) {
- var closeWg sync.WaitGroup
- var stop uint32
- var cnt [n]uint32
+ runtime.GOMAXPROCS(runtime.NumCPU())
- for i := 0; i < n; i++ {
- closeWg.Add(1)
- go func(i int) {
- var put, get, found uint
- defer func() {
- t.Logf("goroutine %d stopped after %d ops, put=%d get=%d found=%d missing=%d",
- i, cnt[i], put, get, found, get-found)
- closeWg.Done()
- }()
+ var (
+ closeWg sync.WaitGroup
+ stop uint32
+ cnt [n]uint32
+ )
- rnd := rand.New(rand.NewSource(int64(1000 + i)))
- for atomic.LoadUint32(&stop) == 0 {
- x := cnt[i]
+ for i := 0; i < n; i++ {
+ closeWg.Add(1)
+ go func(i int) {
+ var put, get, found uint
+ defer func() {
+ t.Logf("goroutine %d stopped after %d ops, put=%d get=%d found=%d missing=%d",
+ i, cnt[i], put, get, found, get-found)
+ closeWg.Done()
+ }()
- k := rnd.Intn(maxkey)
- kstr := fmt.Sprintf("%016d", k)
+ rnd := rand.New(rand.NewSource(int64(1000 + i)))
+ for atomic.LoadUint32(&stop) == 0 {
+ x := cnt[i]
- if (rnd.Int() % 2) > 0 {
- put++
- h.put(kstr, fmt.Sprintf("%d.%d.%-1000d", k, i, x))
- } else {
- get++
- v, err := h.db.Get([]byte(kstr), h.ro)
- if err == nil {
- found++
- rk, ri, rx := 0, -1, uint32(0)
- fmt.Sscanf(string(v), "%d.%d.%d", &rk, &ri, &rx)
- if rk != k {
- t.Errorf("invalid key want=%d got=%d", k, rk)
- }
- if ri < 0 || ri >= n {
- t.Error("invalid goroutine number: ", ri)
- } else {
- tx := atomic.LoadUint32(&(cnt[ri]))
- if rx > tx {
- t.Errorf("invalid seq number, %d > %d ", rx, tx)
- }
- }
- } else if err != ErrNotFound {
- t.Error("Get: got error: ", err)
- return
+ k := rnd.Intn(maxkey)
+ kstr := fmt.Sprintf("%016d", k)
+
+ if (rnd.Int() % 2) > 0 {
+ put++
+ h.put(kstr, fmt.Sprintf("%d.%d.%-1000d", k, i, x))
+ } else {
+ get++
+ v, err := h.db.Get([]byte(kstr), h.ro)
+ if err == nil {
+ found++
+ rk, ri, rx := 0, -1, uint32(0)
+ fmt.Sscanf(string(v), "%d.%d.%d", &rk, &ri, &rx)
+ if rk != k {
+ t.Errorf("invalid key want=%d got=%d", k, rk)
}
+ if ri < 0 || ri >= n {
+ t.Error("invalid goroutine number: ", ri)
+ } else {
+ tx := atomic.LoadUint32(&(cnt[ri]))
+ if rx > tx {
+ t.Errorf("invalid seq number, %d > %d ", rx, tx)
+ }
+ }
+ } else if err != ErrNotFound {
+ t.Error("Get: got error: ", err)
+ return
}
- atomic.AddUint32(&cnt[i], 1)
}
- }(i)
- }
+ atomic.AddUint32(&cnt[i], 1)
+ }
+ }(i)
+ }
- time.Sleep(secs * time.Second)
- atomic.StoreUint32(&stop, 1)
- closeWg.Wait()
- })
-
- runtime.GOMAXPROCS(1)
+ time.Sleep(secs * time.Second)
+ atomic.StoreUint32(&stop, 1)
+ closeWg.Wait()
}
-func TestDB_Concurrent2(t *testing.T) {
- const n, n2 = 4, 4000
+func TestDB_ConcurrentIterator(t *testing.T) {
+ const n, n2 = 4, 1000
+ h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 30})
+ defer h.close()
- runtime.GOMAXPROCS(n*2 + 2)
- truno(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 30}, func(h *dbHarness) {
- var closeWg sync.WaitGroup
- var stop uint32
+ runtime.GOMAXPROCS(runtime.NumCPU())
- for i := 0; i < n; i++ {
- closeWg.Add(1)
- go func(i int) {
- for k := 0; atomic.LoadUint32(&stop) == 0; k++ {
- h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10))
+ var (
+ closeWg sync.WaitGroup
+ stop uint32
+ )
+
+ for i := 0; i < n; i++ {
+ closeWg.Add(1)
+ go func(i int) {
+ for k := 0; atomic.LoadUint32(&stop) == 0; k++ {
+ h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10))
+ }
+ closeWg.Done()
+ }(i)
+ }
+
+ for i := 0; i < n; i++ {
+ closeWg.Add(1)
+ go func(i int) {
+ for k := 1000000; k < 0 || atomic.LoadUint32(&stop) == 0; k-- {
+ h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10))
+ }
+ closeWg.Done()
+ }(i)
+ }
+
+ cmp := comparer.DefaultComparer
+ for i := 0; i < n2; i++ {
+ closeWg.Add(1)
+ go func(i int) {
+ it := h.db.NewIterator(nil, nil)
+ var pk []byte
+ for it.Next() {
+ kk := it.Key()
+ if cmp.Compare(kk, pk) <= 0 {
+ t.Errorf("iter %d: %q is successor of %q", i, pk, kk)
}
- closeWg.Done()
- }(i)
- }
-
- for i := 0; i < n; i++ {
- closeWg.Add(1)
- go func(i int) {
- for k := 1000000; k < 0 || atomic.LoadUint32(&stop) == 0; k-- {
- h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10))
+ pk = append(pk[:0], kk...)
+ var k, vk, vi int
+ if n, err := fmt.Sscanf(string(it.Key()), "k%d", &k); err != nil {
+ t.Errorf("iter %d: Scanf error on key %q: %v", i, it.Key(), err)
+ } else if n < 1 {
+ t.Errorf("iter %d: Cannot parse key %q", i, it.Key())
}
- closeWg.Done()
- }(i)
- }
-
- cmp := comparer.DefaultComparer
- for i := 0; i < n2; i++ {
- closeWg.Add(1)
- go func(i int) {
- it := h.db.NewIterator(nil, nil)
- var pk []byte
- for it.Next() {
- kk := it.Key()
- if cmp.Compare(kk, pk) <= 0 {
- t.Errorf("iter %d: %q is successor of %q", i, pk, kk)
- }
- pk = append(pk[:0], kk...)
- var k, vk, vi int
- if n, err := fmt.Sscanf(string(it.Key()), "k%d", &k); err != nil {
- t.Errorf("iter %d: Scanf error on key %q: %v", i, it.Key(), err)
- } else if n < 1 {
- t.Errorf("iter %d: Cannot parse key %q", i, it.Key())
- }
- if n, err := fmt.Sscanf(string(it.Value()), "%d.%d", &vk, &vi); err != nil {
- t.Errorf("iter %d: Scanf error on value %q: %v", i, it.Value(), err)
- } else if n < 2 {
- t.Errorf("iter %d: Cannot parse value %q", i, it.Value())
- }
-
- if vk != k {
- t.Errorf("iter %d: invalid value i=%d, want=%d got=%d", i, vi, k, vk)
- }
+ if n, err := fmt.Sscanf(string(it.Value()), "%d.%d", &vk, &vi); err != nil {
+ t.Errorf("iter %d: Scanf error on value %q: %v", i, it.Value(), err)
+ } else if n < 2 {
+ t.Errorf("iter %d: Cannot parse value %q", i, it.Value())
}
- if err := it.Error(); err != nil {
- t.Errorf("iter %d: Got error: %v", i, err)
+
+ if vk != k {
+ t.Errorf("iter %d: invalid value i=%d, want=%d got=%d", i, vi, k, vk)
}
- it.Release()
- closeWg.Done()
- }(i)
- }
+ }
+ if err := it.Error(); err != nil {
+ t.Errorf("iter %d: Got error: %v", i, err)
+ }
+ it.Release()
+ closeWg.Done()
+ }(i)
+ }
- atomic.StoreUint32(&stop, 1)
- closeWg.Wait()
- })
+ atomic.StoreUint32(&stop, 1)
+ closeWg.Wait()
+}
- runtime.GOMAXPROCS(1)
+func TestDB_ConcurrentWrite(t *testing.T) {
+ const n, niter = 10, 10000
+ h := newDbHarness(t)
+ defer h.close()
+
+ runtime.GOMAXPROCS(runtime.NumCPU())
+
+ var wg sync.WaitGroup
+ for i := 0; i < n; i++ {
+ wg.Add(1)
+ go func(i int) {
+ defer wg.Done()
+ for k := 0; k < niter; k++ {
+ kstr := fmt.Sprintf("%d.%d", i, k)
+ vstr := fmt.Sprintf("v%d", k)
+ h.put(kstr, vstr)
+ // Key should immediately available after put returns.
+ h.getVal(kstr, vstr)
+ }
+ }(i)
+ }
+ wg.Wait()
}
func TestDB_CreateReopenDbOnFile(t *testing.T) {