blob: 59c496d54c62e90afe4cd057d81f412b10edef6b [file] [log] [blame]
// Copyright (c) 2014, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package testutil
import (
"bytes"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
. "github.com/onsi/gomega"
"github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/util"
)
var (
storageMu sync.Mutex
storageUseFS bool = true
storageKeepFS bool = false
storageNum int
)
type StorageMode int
const (
ModeOpen StorageMode = 1 << iota
ModeCreate
ModeRemove
ModeRead
ModeWrite
ModeSync
ModeClose
)
const (
modeOpen = iota
modeCreate
modeRemove
modeRead
modeWrite
modeSync
modeClose
modeCount
)
const (
typeManifest = iota
typeJournal
typeTable
typeTemp
typeCount
)
const flattenCount = modeCount * typeCount
func flattenType(m StorageMode, t storage.FileType) int {
var x int
switch m {
case ModeOpen:
x = modeOpen
case ModeCreate:
x = modeCreate
case ModeRemove:
x = modeRemove
case ModeRead:
x = modeRead
case ModeWrite:
x = modeWrite
case ModeSync:
x = modeSync
case ModeClose:
x = modeClose
default:
panic("invalid storage mode")
}
x *= typeCount
switch t {
case storage.TypeManifest:
return x + typeManifest
case storage.TypeJournal:
return x + typeJournal
case storage.TypeTable:
return x + typeTable
case storage.TypeTemp:
return x + typeTemp
default:
panic("invalid file type")
}
}
func listFlattenType(m StorageMode, t storage.FileType) []int {
ret := make([]int, 0, flattenCount)
add := func(x int) {
x *= typeCount
switch {
case t&storage.TypeManifest != 0:
ret = append(ret, x+typeManifest)
case t&storage.TypeJournal != 0:
ret = append(ret, x+typeJournal)
case t&storage.TypeTable != 0:
ret = append(ret, x+typeTable)
case t&storage.TypeTemp != 0:
ret = append(ret, x+typeTemp)
}
}
switch {
case m&ModeOpen != 0:
add(modeOpen)
case m&ModeCreate != 0:
add(modeCreate)
case m&ModeRemove != 0:
add(modeRemove)
case m&ModeRead != 0:
add(modeRead)
case m&ModeWrite != 0:
add(modeWrite)
case m&ModeSync != 0:
add(modeSync)
case m&ModeClose != 0:
add(modeClose)
}
return ret
}
func packFile(num uint64, t storage.FileType) uint64 {
if num>>(64-typeCount) != 0 {
panic("overflow")
}
return num<<typeCount | uint64(t)
}
func unpackFile(x uint64) (uint64, storage.FileType) {
return x >> typeCount, storage.FileType(x) & storage.TypeAll
}
type emulatedError struct {
err error
}
func (err emulatedError) Error() string {
return fmt.Sprintf("emulated storage error: %v", err.err)
}
type storageLock struct {
s *Storage
r util.Releaser
}
func (l storageLock) Release() {
l.r.Release()
l.s.logI("storage lock released")
}
type reader struct {
f *file
storage.Reader
}
func (r *reader) Read(p []byte) (n int, err error) {
err = r.f.s.emulateError(ModeRead, r.f.Type())
if err == nil {
r.f.s.stall(ModeRead, r.f.Type())
n, err = r.Reader.Read(p)
}
r.f.s.count(ModeRead, r.f.Type(), n)
if err != nil && err != io.EOF {
r.f.s.logI("read error, num=%d type=%v n=%d err=%v", r.f.Num(), r.f.Type(), n, err)
}
return
}
func (r *reader) ReadAt(p []byte, off int64) (n int, err error) {
err = r.f.s.emulateError(ModeRead, r.f.Type())
if err == nil {
r.f.s.stall(ModeRead, r.f.Type())
n, err = r.Reader.ReadAt(p, off)
}
r.f.s.count(ModeRead, r.f.Type(), n)
if err != nil && err != io.EOF {
r.f.s.logI("readAt error, num=%d type=%v offset=%d n=%d err=%v", r.f.Num(), r.f.Type(), off, n, err)
}
return
}
func (r *reader) Close() (err error) {
return r.f.doClose(r.Reader)
}
type writer struct {
f *file
storage.Writer
}
func (w *writer) Write(p []byte) (n int, err error) {
err = w.f.s.emulateError(ModeWrite, w.f.Type())
if err == nil {
w.f.s.stall(ModeWrite, w.f.Type())
n, err = w.Writer.Write(p)
}
w.f.s.count(ModeWrite, w.f.Type(), n)
if err != nil && err != io.EOF {
w.f.s.logI("write error, num=%d type=%v n=%d err=%v", w.f.Num(), w.f.Type(), n, err)
}
return
}
func (w *writer) Sync() (err error) {
err = w.f.s.emulateError(ModeSync, w.f.Type())
if err == nil {
w.f.s.stall(ModeSync, w.f.Type())
err = w.Writer.Sync()
}
w.f.s.count(ModeSync, w.f.Type(), 0)
if err != nil {
w.f.s.logI("sync error, num=%d type=%v err=%v", w.f.Num(), w.f.Type(), err)
}
return
}
func (w *writer) Close() (err error) {
return w.f.doClose(w.Writer)
}
type file struct {
s *Storage
storage.File
}
func (f *file) pack() uint64 {
return packFile(f.Num(), f.Type())
}
func (f *file) assertOpen() {
ExpectWithOffset(2, f.s.opens).NotTo(HaveKey(f.pack()), "File open, num=%d type=%v writer=%v", f.Num(), f.Type(), f.s.opens[f.pack()])
}
func (f *file) doClose(closer io.Closer) (err error) {
err = f.s.emulateError(ModeClose, f.Type())
if err == nil {
f.s.stall(ModeClose, f.Type())
}
f.s.mu.Lock()
defer f.s.mu.Unlock()
if err == nil {
ExpectWithOffset(2, f.s.opens).To(HaveKey(f.pack()), "File closed, num=%d type=%v", f.Num(), f.Type())
err = closer.Close()
}
f.s.countNB(ModeClose, f.Type(), 0)
writer := f.s.opens[f.pack()]
if err != nil {
f.s.logISkip(1, "file close failed, num=%d type=%v writer=%v err=%v", f.Num(), f.Type(), writer, err)
} else {
f.s.logISkip(1, "file closed, num=%d type=%v writer=%v", f.Num(), f.Type(), writer)
delete(f.s.opens, f.pack())
}
return
}
func (f *file) Open() (r storage.Reader, err error) {
err = f.s.emulateError(ModeOpen, f.Type())
if err == nil {
f.s.stall(ModeOpen, f.Type())
}
f.s.mu.Lock()
defer f.s.mu.Unlock()
if err == nil {
f.assertOpen()
f.s.countNB(ModeOpen, f.Type(), 0)
r, err = f.File.Open()
}
if err != nil {
f.s.logI("file open failed, num=%d type=%v err=%v", f.Num(), f.Type(), err)
} else {
f.s.logI("file opened, num=%d type=%v", f.Num(), f.Type())
f.s.opens[f.pack()] = false
r = &reader{f, r}
}
return
}
func (f *file) Create() (w storage.Writer, err error) {
err = f.s.emulateError(ModeCreate, f.Type())
if err == nil {
f.s.stall(ModeCreate, f.Type())
}
f.s.mu.Lock()
defer f.s.mu.Unlock()
if err == nil {
f.assertOpen()
f.s.countNB(ModeCreate, f.Type(), 0)
w, err = f.File.Create()
}
if err != nil {
f.s.logI("file create failed, num=%d type=%v err=%v", f.Num(), f.Type(), err)
} else {
f.s.logI("file created, num=%d type=%v", f.Num(), f.Type())
f.s.opens[f.pack()] = true
w = &writer{f, w}
}
return
}
func (f *file) Remove() (err error) {
err = f.s.emulateError(ModeRemove, f.Type())
if err == nil {
f.s.stall(ModeRemove, f.Type())
}
f.s.mu.Lock()
defer f.s.mu.Unlock()
if err == nil {
f.assertOpen()
f.s.countNB(ModeRemove, f.Type(), 0)
err = f.File.Remove()
}
if err != nil {
f.s.logI("file remove failed, num=%d type=%v err=%v", f.Num(), f.Type(), err)
} else {
f.s.logI("file removed, num=%d type=%v", f.Num(), f.Type())
}
return
}
type Storage struct {
storage.Storage
closeFn func() error
lmu sync.Mutex
lb bytes.Buffer
mu sync.Mutex
// Open files, true=writer, false=reader
opens map[uint64]bool
counters [flattenCount]int
bytesCounter [flattenCount]int64
emulatedError [flattenCount]error
stallCond sync.Cond
stalled [flattenCount]bool
}
func (s *Storage) log(skip int, str string) {
s.lmu.Lock()
defer s.lmu.Unlock()
_, file, line, ok := runtime.Caller(skip + 2)
if ok {
// Truncate file name at last file name separator.
if index := strings.LastIndex(file, "/"); index >= 0 {
file = file[index+1:]
} else if index = strings.LastIndex(file, "\\"); index >= 0 {
file = file[index+1:]
}
} else {
file = "???"
line = 1
}
fmt.Fprintf(&s.lb, "%s:%d: ", file, line)
lines := strings.Split(str, "\n")
if l := len(lines); l > 1 && lines[l-1] == "" {
lines = lines[:l-1]
}
for i, line := range lines {
if i > 0 {
s.lb.WriteString("\n\t")
}
s.lb.WriteString(line)
}
s.lb.WriteByte('\n')
}
func (s *Storage) logISkip(skip int, format string, args ...interface{}) {
pc, _, _, ok := runtime.Caller(skip + 1)
if ok {
if f := runtime.FuncForPC(pc); f != nil {
fname := f.Name()
if index := strings.LastIndex(fname, "."); index >= 0 {
fname = fname[index+1:]
}
format = fname + ": " + format
}
}
s.log(skip+1, fmt.Sprintf(format, args...))
}
func (s *Storage) logI(format string, args ...interface{}) {
s.logISkip(1, format, args...)
}
func (s *Storage) Log(str string) {
s.log(1, "Log: "+str)
s.Storage.Log(str)
}
func (s *Storage) Lock() (r util.Releaser, err error) {
r, err = s.Storage.Lock()
if err != nil {
s.logI("storage locking failed, err=%v", err)
} else {
s.logI("storage locked")
r = storageLock{s, r}
}
return
}
func (s *Storage) GetFile(num uint64, t storage.FileType) storage.File {
return &file{s, s.Storage.GetFile(num, t)}
}
func (s *Storage) GetFiles(t storage.FileType) (files []storage.File, err error) {
rfiles, err := s.Storage.GetFiles(t)
if err != nil {
s.logI("get files failed, err=%v", err)
return
}
files = make([]storage.File, len(rfiles))
for i, f := range rfiles {
files[i] = &file{s, f}
}
s.logI("get files, type=0x%x count=%d", int(t), len(files))
return
}
func (s *Storage) GetManifest() (f storage.File, err error) {
manifest, err := s.Storage.GetManifest()
if err != nil {
if !os.IsNotExist(err) {
s.logI("get manifest failed, err=%v", err)
}
return
}
s.logI("get manifest, num=%d", manifest.Num())
return &file{s, manifest}, nil
}
func (s *Storage) SetManifest(f storage.File) error {
f_, ok := f.(*file)
ExpectWithOffset(1, ok).To(BeTrue())
ExpectWithOffset(1, f_.Type()).To(Equal(storage.TypeManifest))
err := s.Storage.SetManifest(f_.File)
if err != nil {
s.logI("set manifest failed, err=%v", err)
} else {
s.logI("set manifest, num=%d", f_.Num())
}
return err
}
func (s *Storage) openFiles() string {
out := "Open files:"
for x, writer := range s.opens {
num, t := unpackFile(x)
out += fmt.Sprintf("\n ยท num=%d type=%v writer=%v", num, t, writer)
}
return out
}
func (s *Storage) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
ExpectWithOffset(1, s.opens).To(BeEmpty(), s.openFiles())
err := s.Storage.Close()
if err != nil {
s.logI("storage closing failed, err=%v", err)
} else {
s.logI("storage closed")
}
if s.closeFn != nil {
if err1 := s.closeFn(); err1 != nil {
s.logI("close func error, err=%v", err1)
}
}
return err
}
func (s *Storage) countNB(m StorageMode, t storage.FileType, n int) {
s.counters[flattenType(m, t)]++
s.bytesCounter[flattenType(m, t)] += int64(n)
}
func (s *Storage) count(m StorageMode, t storage.FileType, n int) {
s.mu.Lock()
defer s.mu.Unlock()
s.countNB(m, t, n)
}
func (s *Storage) ResetCounter(m StorageMode, t storage.FileType) {
for _, x := range listFlattenType(m, t) {
s.counters[x] = 0
s.bytesCounter[x] = 0
}
}
func (s *Storage) Counter(m StorageMode, t storage.FileType) (count int, bytes int64) {
for _, x := range listFlattenType(m, t) {
count += s.counters[x]
bytes += s.bytesCounter[x]
}
return
}
func (s *Storage) emulateError(m StorageMode, t storage.FileType) error {
s.mu.Lock()
defer s.mu.Unlock()
err := s.emulatedError[flattenType(m, t)]
if err != nil {
return emulatedError{err}
}
return nil
}
func (s *Storage) EmulateError(m StorageMode, t storage.FileType, err error) {
s.mu.Lock()
defer s.mu.Unlock()
for _, x := range listFlattenType(m, t) {
s.emulatedError[x] = err
}
}
func (s *Storage) stall(m StorageMode, t storage.FileType) {
x := flattenType(m, t)
s.mu.Lock()
defer s.mu.Unlock()
for s.stalled[x] {
s.stallCond.Wait()
}
}
func (s *Storage) Stall(m StorageMode, t storage.FileType) {
s.mu.Lock()
defer s.mu.Unlock()
for _, x := range listFlattenType(m, t) {
s.stalled[x] = true
}
}
func (s *Storage) Release(m StorageMode, t storage.FileType) {
s.mu.Lock()
defer s.mu.Unlock()
for _, x := range listFlattenType(m, t) {
s.stalled[x] = false
}
s.stallCond.Broadcast()
}
func NewStorage() *Storage {
var stor storage.Storage
var closeFn func() error
if storageUseFS {
for {
storageMu.Lock()
num := storageNum
storageNum++
storageMu.Unlock()
path := filepath.Join(os.TempDir(), fmt.Sprintf("goleveldb-test%d0%d0%d", os.Getuid(), os.Getpid(), num))
if _, err := os.Stat(path); os.IsNotExist(err) {
stor, err = storage.OpenFile(path)
ExpectWithOffset(1, err).NotTo(HaveOccurred(), "creating storage at %s", path)
closeFn = func() error {
if storageKeepFS {
return nil
}
return os.RemoveAll(path)
}
break
}
}
} else {
stor = storage.NewMemStorage()
}
s := &Storage{
Storage: stor,
closeFn: closeFn,
opens: make(map[uint64]bool),
}
s.stallCond.L = &s.mu
return s
}