manualtest: add 'leveldb' manual test
diff --git a/manualtest/leveldb/key.go b/manualtest/leveldb/key.go
new file mode 100644
index 0000000..906d0d8
--- /dev/null
+++ b/manualtest/leveldb/key.go
@@ -0,0 +1,136 @@
+package main
+
+import (
+	"encoding/binary"
+	"fmt"
+
+	"github.com/syndtr/goleveldb/leveldb/errors"
+)
+
+type ErrIkeyCorrupted struct {
+	Ikey   []byte
+	Reason string
+}
+
+func (e *ErrIkeyCorrupted) Error() string {
+	return fmt.Sprintf("leveldb: iKey %q corrupted: %s", e.Ikey, e.Reason)
+}
+
+func newErrIkeyCorrupted(ikey []byte, reason string) error {
+	return errors.NewErrCorrupted(nil, &ErrIkeyCorrupted{append([]byte{}, ikey...), reason})
+}
+
+type kType int
+
+func (kt kType) String() string {
+	switch kt {
+	case ktDel:
+		return "d"
+	case ktVal:
+		return "v"
+	}
+	return "x"
+}
+
+// Value types encoded as the last component of internal keys.
+// Don't modify; this value are saved to disk.
+const (
+	ktDel kType = iota
+	ktVal
+)
+
+// ktSeek defines the kType that should be passed when constructing an
+// internal key for seeking to a particular sequence number (since we
+// sort sequence numbers in decreasing order and the value type is
+// embedded as the low 8 bits in the sequence number in internal keys,
+// we need to use the highest-numbered ValueType, not the lowest).
+const ktSeek = ktVal
+
+const (
+	// Maximum value possible for sequence number; the 8-bits are
+	// used by value type, so its can packed together in single
+	// 64-bit integer.
+	kMaxSeq uint64 = (uint64(1) << 56) - 1
+	// Maximum value possible for packed sequence number and type.
+	kMaxNum uint64 = (kMaxSeq << 8) | uint64(ktSeek)
+)
+
+// Maximum number encoded in bytes.
+var kMaxNumBytes = make([]byte, 8)
+
+func init() {
+	binary.LittleEndian.PutUint64(kMaxNumBytes, kMaxNum)
+}
+
+type iKey []byte
+
+func newIkey(ukey []byte, seq uint64, kt kType) iKey {
+	if seq > kMaxSeq {
+		panic("leveldb: invalid sequence number")
+	} else if kt > ktVal {
+		panic("leveldb: invalid type")
+	}
+
+	ik := make(iKey, len(ukey)+8)
+	copy(ik, ukey)
+	binary.LittleEndian.PutUint64(ik[len(ukey):], (seq<<8)|uint64(kt))
+	return ik
+}
+
+func parseIkey(ik []byte) (ukey []byte, seq uint64, kt kType, err error) {
+	if len(ik) < 8 {
+		return nil, 0, 0, newErrIkeyCorrupted(ik, "invalid length")
+	}
+	num := binary.LittleEndian.Uint64(ik[len(ik)-8:])
+	seq, kt = uint64(num>>8), kType(num&0xff)
+	if kt > ktVal {
+		return nil, 0, 0, newErrIkeyCorrupted(ik, "invalid type")
+	}
+	ukey = ik[:len(ik)-8]
+	return
+}
+
+func validIkey(ik []byte) bool {
+	_, _, _, err := parseIkey(ik)
+	return err == nil
+}
+
+func (ik iKey) assert() {
+	if ik == nil {
+		panic("leveldb: nil iKey")
+	}
+	if len(ik) < 8 {
+		panic(fmt.Sprintf("leveldb: iKey %q, len=%d: invalid length", ik, len(ik)))
+	}
+}
+
+func (ik iKey) ukey() []byte {
+	ik.assert()
+	return ik[:len(ik)-8]
+}
+
+func (ik iKey) num() uint64 {
+	ik.assert()
+	return binary.LittleEndian.Uint64(ik[len(ik)-8:])
+}
+
+func (ik iKey) parseNum() (seq uint64, kt kType) {
+	num := ik.num()
+	seq, kt = uint64(num>>8), kType(num&0xff)
+	if kt > ktVal {
+		panic(fmt.Sprintf("leveldb: iKey %q, len=%d: invalid type %#x", ik, len(ik), kt))
+	}
+	return
+}
+
+func (ik iKey) String() string {
+	if ik == nil {
+		return "<nil>"
+	}
+
+	if ukey, seq, kt, err := parseIkey(ik); err == nil {
+		return fmt.Sprintf("%x,%s%d", ukey, kt, seq)
+	} else {
+		return "<invalid>"
+	}
+}
diff --git a/manualtest/leveldb/main.go b/manualtest/leveldb/main.go
new file mode 100644
index 0000000..1047a32
--- /dev/null
+++ b/manualtest/leveldb/main.go
@@ -0,0 +1,599 @@
+package main
+
+import (
+	"crypto/rand"
+	"encoding/binary"
+	"flag"
+	"fmt"
+	"github.com/syndtr/goleveldb/leveldb"
+	"github.com/syndtr/goleveldb/leveldb/errors"
+	"github.com/syndtr/goleveldb/leveldb/opt"
+	"github.com/syndtr/goleveldb/leveldb/storage"
+	"github.com/syndtr/goleveldb/leveldb/table"
+	"github.com/syndtr/goleveldb/leveldb/util"
+	"log"
+	mrand "math/rand"
+	"net/http"
+	_ "net/http/pprof"
+	"os"
+	"os/signal"
+	"path"
+	"runtime"
+	"strconv"
+	"strings"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+var (
+	dbPath           = path.Join(os.TempDir(), "goleveldb-testdb")
+	cachedOpenFiles  = 500
+	dataLen          = 63
+	numKeys          = arrayInt{100000, 1332, 531, 1234, 9553, 1024, 35743}
+	httpProf         = "127.0.0.1:5454"
+	enableBlockCache = false
+
+	wg         = new(sync.WaitGroup)
+	done, fail uint32
+
+	bpool = util.NewBufferPool(opt.DefaultBlockSize + 128)
+)
+
+type arrayInt []int
+
+func (a arrayInt) String() string {
+	var str string
+	for i, n := range a {
+		if i > 0 {
+			str += ","
+		}
+		str += strconv.Itoa(n)
+	}
+	return str
+}
+
+func (a *arrayInt) Set(str string) error {
+	var na arrayInt
+	for _, s := range strings.Split(str, ",") {
+		s = strings.TrimSpace(s)
+		if s != "" {
+			n, err := strconv.Atoi(s)
+			if err != nil {
+				return err
+			}
+			na = append(na, n)
+		}
+	}
+	*a = na
+	return nil
+}
+
+func init() {
+	flag.StringVar(&dbPath, "db", dbPath, "testdb path")
+	flag.IntVar(&cachedOpenFiles, "cachedopenfile", cachedOpenFiles, "cached open file")
+	flag.IntVar(&dataLen, "datalen", dataLen, "data length")
+	flag.Var(&numKeys, "numkeys", "num keys")
+	flag.StringVar(&httpProf, "httpprof", httpProf, "http prof listen addr")
+	flag.BoolVar(&enableBlockCache, "enableblockcache", enableBlockCache, "enable block cache")
+}
+
+func randomData(dst []byte, ns, prefix byte, i uint32) []byte {
+	n := 2 + dataLen + 4 + 4
+	n2 := n*2 + 4
+	if cap(dst) < n2 {
+		dst = make([]byte, n2)
+	} else {
+		dst = dst[:n2]
+	}
+	_, err := rand.Reader.Read(dst[2 : n-8])
+	if err != nil {
+		panic(err)
+	}
+	dst[0] = ns
+	dst[1] = prefix
+	binary.LittleEndian.PutUint32(dst[n-8:], i)
+	binary.LittleEndian.PutUint32(dst[n-4:], util.NewCRC(dst[:n-4]).Value())
+	copy(dst[n:n+n], dst[:n])
+	binary.LittleEndian.PutUint32(dst[n2-4:], util.NewCRC(dst[:n2-4]).Value())
+	return dst
+}
+
+func dataSplit(data []byte) (data0, data1 []byte) {
+	n := (len(data) - 4) / 2
+	return data[:n], data[n : n+n]
+}
+
+func dataNS(data []byte) byte {
+	return data[0]
+}
+
+func dataPrefix(data []byte) byte {
+	return data[1]
+}
+
+func dataI(data []byte) uint32 {
+	return binary.LittleEndian.Uint32(data[len(data)-12:])
+}
+
+func dataChecksum(data []byte) (uint32, uint32) {
+	checksum0 := binary.LittleEndian.Uint32(data[len(data)-4:])
+	checksum1 := util.NewCRC(data[:len(data)-4]).Value()
+	return checksum0, checksum1
+}
+
+func dataPrefixSlice(ns, prefix byte) *util.Range {
+	return util.BytesPrefix([]byte{ns, prefix})
+}
+
+func dataNsSlice(ns byte) *util.Range {
+	return util.BytesPrefix([]byte{ns})
+}
+
+type testingFile struct {
+	storage.File
+}
+
+func (tf *testingFile) Remove() error {
+	if atomic.LoadUint32(&fail) == 1 {
+		return nil
+	}
+
+	if tf.Type() == storage.TypeTable {
+		if scanTable(tf, true) {
+			return nil
+		}
+	}
+	return tf.File.Remove()
+}
+
+type testingStorage struct {
+	storage.Storage
+}
+
+func (ts *testingStorage) GetFile(num uint64, t storage.FileType) storage.File {
+	return &testingFile{ts.Storage.GetFile(num, t)}
+}
+
+func (ts *testingStorage) GetFiles(t storage.FileType) ([]storage.File, error) {
+	files, err := ts.Storage.GetFiles(t)
+	if err != nil {
+		return nil, err
+	}
+	for i := range files {
+		files[i] = &testingFile{files[i]}
+	}
+	return files, nil
+}
+
+func (ts *testingStorage) GetManifest() (storage.File, error) {
+	f, err := ts.Storage.GetManifest()
+	if err == nil {
+		f = &testingFile{f}
+	}
+	return f, err
+}
+
+func (ts *testingStorage) SetManifest(f storage.File) error {
+	return ts.Storage.SetManifest(f.(*testingFile).File)
+}
+
+type latencyStats struct {
+	mark          time.Time
+	dur, min, max time.Duration
+	num           int
+}
+
+func (s *latencyStats) start() {
+	s.mark = time.Now()
+}
+
+func (s *latencyStats) record(n int) {
+	if s.mark.IsZero() {
+		panic("not started")
+	}
+	dur := time.Now().Sub(s.mark)
+	dur1 := dur / time.Duration(n)
+	if dur1 < s.min || s.min == 0 {
+		s.min = dur1
+	}
+	if dur1 > s.max {
+		s.max = dur1
+	}
+	s.dur += dur
+	s.num += n
+	s.mark = time.Time{}
+}
+
+func (s *latencyStats) ratePerSec() int {
+	durSec := s.dur / time.Second
+	if durSec > 0 {
+		return s.num / int(durSec)
+	}
+	return s.num
+}
+
+func (s *latencyStats) avg() time.Duration {
+	if s.num > 0 {
+		return s.dur / time.Duration(s.num)
+	}
+	return 0
+}
+
+func (s *latencyStats) add(x *latencyStats) {
+	if x.min < s.min || s.min == 0 {
+		s.min = x.min
+	}
+	if x.max > s.max {
+		s.max = x.max
+	}
+	s.dur += x.dur
+	s.num += x.num
+}
+
+func scanTable(f storage.File, checksum bool) (corrupted bool) {
+	fi := storage.NewFileInfo(f)
+	r, err := f.Open()
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer r.Close()
+
+	size, err := r.Seek(0, os.SEEK_END)
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	o := &opt.Options{Strict: opt.NoStrict}
+	if checksum {
+		o.Strict = opt.StrictBlockChecksum | opt.StrictReader
+	}
+	tr, err := table.NewReader(r, size, fi, nil, bpool, o)
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer tr.Release()
+
+	checkData := func(i int, t string, data []byte) bool {
+		if len(data) == 0 {
+			panic(fmt.Sprintf("[%v] nil data: i=%d t=%s", fi, i, t))
+		}
+
+		checksum0, checksum1 := dataChecksum(data)
+		if checksum0 != checksum1 {
+			atomic.StoreUint32(&fail, 1)
+			atomic.StoreUint32(&done, 1)
+			corrupted = true
+
+			data0, data1 := dataSplit(data)
+			data0c0, data0c1 := dataChecksum(data0)
+			data1c0, data1c1 := dataChecksum(data1)
+			log.Printf("FATAL: [%v] Corrupted data i=%d t=%s (%#x != %#x): %x(%v) vs %x(%v)",
+				fi, i, t, checksum0, checksum1, data0, data0c0 == data0c1, data1, data1c0 == data1c1)
+			return true
+		}
+		return false
+	}
+
+	iter := tr.NewIterator(nil, nil)
+	defer iter.Release()
+	for i := 0; iter.Next(); i++ {
+		ukey, _, kt, kerr := parseIkey(iter.Key())
+		if kerr != nil {
+			atomic.StoreUint32(&fail, 1)
+			atomic.StoreUint32(&done, 1)
+			corrupted = true
+
+			log.Printf("FATAL: [%v] Corrupted ikey i=%d: %v", fi, i, kerr)
+			return
+		}
+		if checkData(i, "key", ukey) {
+			return
+		}
+		if kt == ktVal && checkData(i, "value", iter.Value()) {
+			return
+		}
+	}
+	if err := iter.Error(); err != nil {
+		if errors.IsCorrupted(err) {
+			atomic.StoreUint32(&fail, 1)
+			atomic.StoreUint32(&done, 1)
+			corrupted = true
+
+			log.Printf("FATAL: [%v] Corruption detected: %v", fi, err)
+		} else {
+			log.Fatal(err)
+		}
+	}
+
+	return
+}
+
+func main() {
+	flag.Parse()
+
+	if httpProf != "" {
+		runtime.SetBlockProfileRate(1)
+		go func() {
+			if err := http.ListenAndServe(httpProf, nil); err != nil {
+				log.Fatalf("HTTPPROF: %v", err)
+			}
+		}()
+	}
+
+	runtime.GOMAXPROCS(runtime.NumCPU())
+
+	os.RemoveAll(dbPath)
+	stor, err := storage.OpenFile(dbPath)
+	if err != nil {
+		log.Fatal(err)
+	}
+	stor = &testingStorage{stor}
+	defer stor.Close()
+
+	fatalf := func(err error, format string, v ...interface{}) {
+		atomic.StoreUint32(&fail, 1)
+		atomic.StoreUint32(&done, 1)
+		log.Printf("FATAL: "+format, v...)
+		if err != nil && errors.IsCorrupted(err) {
+			cerr := err.(*errors.ErrCorrupted)
+			if cerr.File != nil && cerr.File.Type == storage.TypeTable {
+				if !scanTable(stor.GetFile(cerr.File.Num, cerr.File.Type), false) {
+					log.Printf("FATAL: unable to find corrupted key/value pair in table %v", cerr.File)
+				}
+			}
+		}
+		runtime.Goexit()
+	}
+
+	o := &opt.Options{
+		CachedOpenFiles: cachedOpenFiles,
+		ErrorIfExist:    true,
+	}
+	if !enableBlockCache {
+		o.BlockCache = opt.NoCache
+	}
+
+	db, err := leveldb.Open(stor, o)
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer db.Close()
+
+	var (
+		mu         = &sync.Mutex{}
+		gGetStat   = &latencyStats{}
+		gIterStat  = &latencyStats{}
+		gWriteStat = &latencyStats{}
+		startTime  = time.Now()
+
+		writeReq    = make(chan *leveldb.Batch)
+		writeAck    = make(chan error)
+		writeAckAck = make(chan struct{})
+	)
+
+	go func() {
+		for b := range writeReq {
+			gWriteStat.start()
+			err := db.Write(b, nil)
+			if err == nil {
+				gWriteStat.record(b.Len())
+			}
+			writeAck <- err
+			<-writeAckAck
+		}
+	}()
+
+	go func() {
+		for {
+			time.Sleep(3 * time.Second)
+
+			log.Print("------------------------")
+
+			log.Printf("> Elapsed=%v", time.Now().Sub(startTime))
+			mu.Lock()
+			log.Printf("> GetLatencyMin=%v GetLatencyMax=%v GetLatencyAvg=%v GetRatePerSec=%d",
+				gGetStat.min, gGetStat.max, gGetStat.avg(), gGetStat.ratePerSec())
+			log.Printf("> IterLatencyMin=%v IterLatencyMax=%v IterLatencyAvg=%v WriteRatePerSec=%d",
+				gIterStat.min, gIterStat.max, gIterStat.avg(), gIterStat.ratePerSec())
+			log.Printf("> WriteLatencyMin=%v WriteLatencyMax=%v WriteLatencyAvg=%v WriteRatePerSec=%d",
+				gWriteStat.min, gWriteStat.max, gWriteStat.avg(), gWriteStat.ratePerSec())
+			mu.Unlock()
+
+			cachedblock, _ := db.GetProperty("leveldb.cachedblock")
+			openedtables, _ := db.GetProperty("leveldb.openedtables")
+			alivesnaps, _ := db.GetProperty("leveldb.alivesnaps")
+			aliveiters, _ := db.GetProperty("leveldb.aliveiters")
+			blockpool, _ := db.GetProperty("leveldb.blockpool")
+			log.Printf("> BlockCache=%s OpenedTables=%s AliveSnaps=%s AliveIter=%s BlockPool=%q",
+				cachedblock, openedtables, alivesnaps, aliveiters, blockpool)
+
+			log.Print("------------------------")
+		}
+	}()
+
+	for ns, numKey := range numKeys {
+		func(ns, numKey int) {
+			log.Printf("[%02d] STARTING: numKey=%d", ns, numKey)
+
+			keys := make([][]byte, numKey)
+			for i := range keys {
+				keys[i] = randomData(nil, byte(ns), 1, uint32(i))
+			}
+
+			wg.Add(1)
+			go func() {
+				var wi uint32
+				defer func() {
+					log.Printf("[%02d] WRITER DONE #%d", ns, wi)
+					wg.Done()
+				}()
+
+				var (
+					b       = new(leveldb.Batch)
+					k2, v2  []byte
+					nReader int32
+				)
+				for atomic.LoadUint32(&done) == 0 {
+					log.Printf("[%02d] WRITER #%d", ns, wi)
+
+					b.Reset()
+					for _, k1 := range keys {
+						k2 = randomData(k2, byte(ns), 2, wi)
+						v2 = randomData(v2, byte(ns), 3, wi)
+						b.Put(k2, v2)
+						b.Put(k1, k2)
+					}
+					writeReq <- b
+					if err := <-writeAck; err != nil {
+						fatalf(err, "[%02d] WRITER #%d db.Write: %v", ns, wi, err)
+					}
+
+					snap, err := db.GetSnapshot()
+					if err != nil {
+						fatalf(err, "[%02d] WRITER #%d db.GetSnapshot: %v", ns, wi, err)
+					}
+
+					writeAckAck <- struct{}{}
+
+					wg.Add(1)
+					atomic.AddInt32(&nReader, 1)
+					go func(snapwi uint32, snap *leveldb.Snapshot) {
+						var (
+							ri       int
+							iterStat = &latencyStats{}
+							getStat  = &latencyStats{}
+						)
+						defer func() {
+							mu.Lock()
+							gGetStat.add(getStat)
+							gIterStat.add(iterStat)
+							mu.Unlock()
+
+							atomic.AddInt32(&nReader, -1)
+							log.Printf("[%02d] READER #%d.%d DONE Snap=%v Alive=%d IterLatency=%v GetLatency=%v", ns, snapwi, ri, snap, atomic.LoadInt32(&nReader), iterStat.avg(), getStat.avg())
+							snap.Release()
+							wg.Done()
+						}()
+
+						stopi := snapwi + 3
+						for (ri < 3 || atomic.LoadUint32(&wi) < stopi) && atomic.LoadUint32(&done) == 0 {
+							var n int
+							iter := snap.NewIterator(dataPrefixSlice(byte(ns), 1), nil)
+							iterStat.start()
+							for iter.Next() {
+								k1 := iter.Key()
+								k2 := iter.Value()
+								iterStat.record(1)
+
+								if dataNS(k2) != byte(ns) {
+									fatalf(nil, "[%02d] READER #%d.%d K%d invalid in-key NS: want=%d got=%d", ns, snapwi, ri, n, ns, dataNS(k2))
+								}
+
+								kwritei := dataI(k2)
+								if kwritei != snapwi {
+									fatalf(nil, "[%02d] READER #%d.%d K%d invalid in-key iter num: %d", ns, snapwi, ri, n, kwritei)
+								}
+
+								getStat.start()
+								_, err := snap.Get(k2, nil)
+								if err != nil {
+									fatalf(err, "[%02d] READER #%d.%d K%d snap.Get: %v\nk1: %x\n -> k2: %x", ns, snapwi, ri, n, err, k1, k2)
+								}
+								getStat.record(1)
+
+								n++
+								iterStat.start()
+							}
+							iter.Release()
+							if err := iter.Error(); err != nil {
+								fatalf(nil, "[%02d] READER #%d.%d K%d iter.Error: %v", ns, snapwi, ri, numKey, err)
+							}
+							if n != numKey {
+								fatalf(nil, "[%02d] READER #%d.%d missing keys: want=%d got=%d", ns, snapwi, ri, numKey, n)
+							}
+
+							ri++
+						}
+					}(wi, snap)
+
+					atomic.AddUint32(&wi, 1)
+				}
+			}()
+
+			delB := new(leveldb.Batch)
+			wg.Add(1)
+			go func() {
+				var (
+					i        int
+					iterStat = &latencyStats{}
+				)
+				defer func() {
+					log.Printf("[%02d] SCANNER DONE #%d", ns, i)
+					wg.Done()
+				}()
+
+				time.Sleep(2 * time.Second)
+
+				for atomic.LoadUint32(&done) == 0 {
+					var n int
+					delB.Reset()
+					iter := db.NewIterator(dataNsSlice(byte(ns)), nil)
+					iterStat.start()
+					for iter.Next() && atomic.LoadUint32(&done) == 0 {
+						k := iter.Key()
+						v := iter.Value()
+						iterStat.record(1)
+
+						for ci, x := range [...][]byte{k, v} {
+							checksum0, checksum1 := dataChecksum(x)
+							if checksum0 != checksum1 {
+								if ci == 0 {
+									fatalf(nil, "[%02d] SCANNER %d.%d invalid key checksum: want %d, got %d\n%x -> %x", ns, i, n, checksum0, checksum1, k, v)
+								} else {
+									fatalf(nil, "[%02d] SCANNER %d.%d invalid value checksum: want %d, got %d\n%x -> %x", ns, i, n, checksum0, checksum1, k, v)
+								}
+							}
+						}
+
+						if dataPrefix(k) == 2 || mrand.Int()%999 == 0 {
+							delB.Delete(k)
+						}
+
+						n++
+						iterStat.start()
+					}
+					iter.Release()
+					if err := iter.Error(); err != nil {
+						fatalf(nil, "[%02d] SCANNER #%d.%d iter.Error: %v", ns, i, n, err)
+					}
+
+					if n > 0 {
+						log.Printf("[%02d] SCANNER #%d IterLatency=%v", ns, i, iterStat.avg())
+					}
+
+					if delB.Len() > 0 && atomic.LoadUint32(&done) == 0 {
+						t := time.Now()
+						writeReq <- delB
+						if err := <-writeAck; err != nil {
+							fatalf(err, "[%02d] SCANNER #%d db.Write: %v", ns, i, err)
+						}
+						writeAckAck <- struct{}{}
+						log.Printf("[%02d] SCANNER #%d Deleted=%d Time=%v", ns, i, delB.Len(), time.Now().Sub(t))
+					}
+
+					i++
+				}
+			}()
+		}(ns, numKey)
+	}
+
+	go func() {
+		sig := make(chan os.Signal)
+		signal.Notify(sig, os.Interrupt, os.Kill)
+		log.Printf("Got signal: %v, exiting...", <-sig)
+		atomic.StoreUint32(&done, 1)
+	}()
+
+	wg.Wait()
+}