leveldb: add concurrent write testing #136
diff --git a/.travis.yml b/.travis.yml
index 8288a4c..1428e77 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -6,4 +6,6 @@
   - 1.4
   - tip
 
-script: go test -timeout 1h ./...
+script:
+  - go test -timeout 1h ./...
+  - go test -timeout 30m -race -run "TestDB_(Concurrent|GoleveldbIssue74)" ./leveldb
diff --git a/leveldb/db_test.go b/leveldb/db_test.go
index 9be0487..37d08eb 100644
--- a/leveldb/db_test.go
+++ b/leveldb/db_test.go
@@ -1740,139 +1740,165 @@
 }
 
 func TestDB_Concurrent(t *testing.T) {
-	const n, secs, maxkey = 4, 2, 1000
+	const n, secs, maxkey = 4, 6, 1000
+	h := newDbHarness(t)
+	defer h.close()
 
-	runtime.GOMAXPROCS(n)
-	trun(t, func(h *dbHarness) {
-		var closeWg sync.WaitGroup
-		var stop uint32
-		var cnt [n]uint32
+	runtime.GOMAXPROCS(runtime.NumCPU())
 
-		for i := 0; i < n; i++ {
-			closeWg.Add(1)
-			go func(i int) {
-				var put, get, found uint
-				defer func() {
-					t.Logf("goroutine %d stopped after %d ops, put=%d get=%d found=%d missing=%d",
-						i, cnt[i], put, get, found, get-found)
-					closeWg.Done()
-				}()
+	var (
+		closeWg sync.WaitGroup
+		stop    uint32
+		cnt     [n]uint32
+	)
 
-				rnd := rand.New(rand.NewSource(int64(1000 + i)))
-				for atomic.LoadUint32(&stop) == 0 {
-					x := cnt[i]
+	for i := 0; i < n; i++ {
+		closeWg.Add(1)
+		go func(i int) {
+			var put, get, found uint
+			defer func() {
+				t.Logf("goroutine %d stopped after %d ops, put=%d get=%d found=%d missing=%d",
+					i, cnt[i], put, get, found, get-found)
+				closeWg.Done()
+			}()
 
-					k := rnd.Intn(maxkey)
-					kstr := fmt.Sprintf("%016d", k)
+			rnd := rand.New(rand.NewSource(int64(1000 + i)))
+			for atomic.LoadUint32(&stop) == 0 {
+				x := cnt[i]
 
-					if (rnd.Int() % 2) > 0 {
-						put++
-						h.put(kstr, fmt.Sprintf("%d.%d.%-1000d", k, i, x))
-					} else {
-						get++
-						v, err := h.db.Get([]byte(kstr), h.ro)
-						if err == nil {
-							found++
-							rk, ri, rx := 0, -1, uint32(0)
-							fmt.Sscanf(string(v), "%d.%d.%d", &rk, &ri, &rx)
-							if rk != k {
-								t.Errorf("invalid key want=%d got=%d", k, rk)
-							}
-							if ri < 0 || ri >= n {
-								t.Error("invalid goroutine number: ", ri)
-							} else {
-								tx := atomic.LoadUint32(&(cnt[ri]))
-								if rx > tx {
-									t.Errorf("invalid seq number, %d > %d ", rx, tx)
-								}
-							}
-						} else if err != ErrNotFound {
-							t.Error("Get: got error: ", err)
-							return
+				k := rnd.Intn(maxkey)
+				kstr := fmt.Sprintf("%016d", k)
+
+				if (rnd.Int() % 2) > 0 {
+					put++
+					h.put(kstr, fmt.Sprintf("%d.%d.%-1000d", k, i, x))
+				} else {
+					get++
+					v, err := h.db.Get([]byte(kstr), h.ro)
+					if err == nil {
+						found++
+						rk, ri, rx := 0, -1, uint32(0)
+						fmt.Sscanf(string(v), "%d.%d.%d", &rk, &ri, &rx)
+						if rk != k {
+							t.Errorf("invalid key want=%d got=%d", k, rk)
 						}
+						if ri < 0 || ri >= n {
+							t.Error("invalid goroutine number: ", ri)
+						} else {
+							tx := atomic.LoadUint32(&(cnt[ri]))
+							if rx > tx {
+								t.Errorf("invalid seq number, %d > %d ", rx, tx)
+							}
+						}
+					} else if err != ErrNotFound {
+						t.Error("Get: got error: ", err)
+						return
 					}
-					atomic.AddUint32(&cnt[i], 1)
 				}
-			}(i)
-		}
+				atomic.AddUint32(&cnt[i], 1)
+			}
+		}(i)
+	}
 
-		time.Sleep(secs * time.Second)
-		atomic.StoreUint32(&stop, 1)
-		closeWg.Wait()
-	})
-
-	runtime.GOMAXPROCS(1)
+	time.Sleep(secs * time.Second)
+	atomic.StoreUint32(&stop, 1)
+	closeWg.Wait()
 }
 
-func TestDB_Concurrent2(t *testing.T) {
-	const n, n2 = 4, 4000
+func TestDB_ConcurrentIterator(t *testing.T) {
+	const n, n2 = 4, 1000
+	h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 30})
+	defer h.close()
 
-	runtime.GOMAXPROCS(n*2 + 2)
-	truno(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 30}, func(h *dbHarness) {
-		var closeWg sync.WaitGroup
-		var stop uint32
+	runtime.GOMAXPROCS(runtime.NumCPU())
 
-		for i := 0; i < n; i++ {
-			closeWg.Add(1)
-			go func(i int) {
-				for k := 0; atomic.LoadUint32(&stop) == 0; k++ {
-					h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10))
+	var (
+		closeWg sync.WaitGroup
+		stop    uint32
+	)
+
+	for i := 0; i < n; i++ {
+		closeWg.Add(1)
+		go func(i int) {
+			for k := 0; atomic.LoadUint32(&stop) == 0; k++ {
+				h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10))
+			}
+			closeWg.Done()
+		}(i)
+	}
+
+	for i := 0; i < n; i++ {
+		closeWg.Add(1)
+		go func(i int) {
+			for k := 1000000; k < 0 || atomic.LoadUint32(&stop) == 0; k-- {
+				h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10))
+			}
+			closeWg.Done()
+		}(i)
+	}
+
+	cmp := comparer.DefaultComparer
+	for i := 0; i < n2; i++ {
+		closeWg.Add(1)
+		go func(i int) {
+			it := h.db.NewIterator(nil, nil)
+			var pk []byte
+			for it.Next() {
+				kk := it.Key()
+				if cmp.Compare(kk, pk) <= 0 {
+					t.Errorf("iter %d: %q is successor of %q", i, pk, kk)
 				}
-				closeWg.Done()
-			}(i)
-		}
-
-		for i := 0; i < n; i++ {
-			closeWg.Add(1)
-			go func(i int) {
-				for k := 1000000; k < 0 || atomic.LoadUint32(&stop) == 0; k-- {
-					h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10))
+				pk = append(pk[:0], kk...)
+				var k, vk, vi int
+				if n, err := fmt.Sscanf(string(it.Key()), "k%d", &k); err != nil {
+					t.Errorf("iter %d: Scanf error on key %q: %v", i, it.Key(), err)
+				} else if n < 1 {
+					t.Errorf("iter %d: Cannot parse key %q", i, it.Key())
 				}
-				closeWg.Done()
-			}(i)
-		}
-
-		cmp := comparer.DefaultComparer
-		for i := 0; i < n2; i++ {
-			closeWg.Add(1)
-			go func(i int) {
-				it := h.db.NewIterator(nil, nil)
-				var pk []byte
-				for it.Next() {
-					kk := it.Key()
-					if cmp.Compare(kk, pk) <= 0 {
-						t.Errorf("iter %d: %q is successor of %q", i, pk, kk)
-					}
-					pk = append(pk[:0], kk...)
-					var k, vk, vi int
-					if n, err := fmt.Sscanf(string(it.Key()), "k%d", &k); err != nil {
-						t.Errorf("iter %d: Scanf error on key %q: %v", i, it.Key(), err)
-					} else if n < 1 {
-						t.Errorf("iter %d: Cannot parse key %q", i, it.Key())
-					}
-					if n, err := fmt.Sscanf(string(it.Value()), "%d.%d", &vk, &vi); err != nil {
-						t.Errorf("iter %d: Scanf error on value %q: %v", i, it.Value(), err)
-					} else if n < 2 {
-						t.Errorf("iter %d: Cannot parse value %q", i, it.Value())
-					}
-
-					if vk != k {
-						t.Errorf("iter %d: invalid value i=%d, want=%d got=%d", i, vi, k, vk)
-					}
+				if n, err := fmt.Sscanf(string(it.Value()), "%d.%d", &vk, &vi); err != nil {
+					t.Errorf("iter %d: Scanf error on value %q: %v", i, it.Value(), err)
+				} else if n < 2 {
+					t.Errorf("iter %d: Cannot parse value %q", i, it.Value())
 				}
-				if err := it.Error(); err != nil {
-					t.Errorf("iter %d: Got error: %v", i, err)
+
+				if vk != k {
+					t.Errorf("iter %d: invalid value i=%d, want=%d got=%d", i, vi, k, vk)
 				}
-				it.Release()
-				closeWg.Done()
-			}(i)
-		}
+			}
+			if err := it.Error(); err != nil {
+				t.Errorf("iter %d: Got error: %v", i, err)
+			}
+			it.Release()
+			closeWg.Done()
+		}(i)
+	}
 
-		atomic.StoreUint32(&stop, 1)
-		closeWg.Wait()
-	})
+	atomic.StoreUint32(&stop, 1)
+	closeWg.Wait()
+}
 
-	runtime.GOMAXPROCS(1)
+func TestDB_ConcurrentWrite(t *testing.T) {
+	const n, niter = 10, 10000
+	h := newDbHarness(t)
+	defer h.close()
+
+	runtime.GOMAXPROCS(runtime.NumCPU())
+
+	var wg sync.WaitGroup
+	for i := 0; i < n; i++ {
+		wg.Add(1)
+		go func(i int) {
+			defer wg.Done()
+			for k := 0; k < niter; k++ {
+				kstr := fmt.Sprintf("%d.%d", i, k)
+				vstr := fmt.Sprintf("v%d", k)
+				h.put(kstr, vstr)
+				// Key should immediately available after put returns.
+				h.getVal(kstr, vstr)
+			}
+		}(i)
+	}
+	wg.Wait()
 }
 
 func TestDB_CreateReopenDbOnFile(t *testing.T) {