storage: revamp storage api
diff --git a/leveldb/batch.go b/leveldb/batch.go
index ccf390c..43ea7b5 100644
--- a/leveldb/batch.go
+++ b/leveldb/batch.go
@@ -12,6 +12,7 @@
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/memdb"
+ "github.com/syndtr/goleveldb/leveldb/storage"
)
type ErrBatchCorrupted struct {
@@ -23,7 +24,7 @@
}
func newErrBatchCorrupted(reason string) error {
- return errors.NewErrCorrupted(nil, &ErrBatchCorrupted{reason})
+ return errors.NewErrCorrupted(storage.FileDesc{}, &ErrBatchCorrupted{reason})
}
const (
diff --git a/leveldb/corrupt_test.go b/leveldb/corrupt_test.go
index a351874..fef2026 100644
--- a/leveldb/corrupt_test.go
+++ b/leveldb/corrupt_test.go
@@ -9,12 +9,13 @@
import (
"bytes"
"fmt"
- "github.com/syndtr/goleveldb/leveldb/filter"
- "github.com/syndtr/goleveldb/leveldb/opt"
- "github.com/syndtr/goleveldb/leveldb/storage"
"io"
"math/rand"
"testing"
+
+ "github.com/syndtr/goleveldb/leveldb/filter"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+ "github.com/syndtr/goleveldb/leveldb/storage"
)
const ctValSize = 1000
@@ -99,19 +100,17 @@
p := &h.dbHarness
t := p.t
- ff, _ := p.stor.GetFiles(ft)
- sff := files(ff)
- sff.sort()
+ fds, _ := p.stor.List(ft)
+ sortFds(fds)
if fi < 0 {
- fi = len(sff) - 1
+ fi = len(fds) - 1
}
- if fi >= len(sff) {
+ if fi >= len(fds) {
t.Fatalf("no such file with type %q with index %d", ft, fi)
}
- file := sff[fi]
-
- r, err := file.Open()
+ fd := fds[fi]
+ r, err := h.stor.Open(fd)
if err != nil {
t.Fatal("cannot open file: ", err)
}
@@ -149,11 +148,11 @@
buf[offset+i] ^= 0x80
}
- err = file.Remove()
+ err = h.stor.Remove(fd)
if err != nil {
t.Fatal("cannot remove old file: ", err)
}
- w, err := file.Create()
+ w, err := h.stor.Create(fd)
if err != nil {
t.Fatal("cannot create new file: ", err)
}
@@ -165,25 +164,37 @@
}
func (h *dbCorruptHarness) removeAll(ft storage.FileType) {
- ff, err := h.stor.GetFiles(ft)
+ fds, err := h.stor.List(ft)
if err != nil {
h.t.Fatal("get files: ", err)
}
- for _, f := range ff {
- if err := f.Remove(); err != nil {
+ for _, fd := range fds {
+ if err := h.stor.Remove(fd); err != nil {
+ h.t.Error("remove file: ", err)
+ }
+ }
+}
+
+func (h *dbCorruptHarness) forceRemoveAll(ft storage.FileType) {
+ fds, err := h.stor.List(ft)
+ if err != nil {
+ h.t.Fatal("get files: ", err)
+ }
+ for _, fd := range fds {
+ if err := h.stor.ForceRemove(fd); err != nil {
h.t.Error("remove file: ", err)
}
}
}
func (h *dbCorruptHarness) removeOne(ft storage.FileType) {
- ff, err := h.stor.GetFiles(ft)
+ fds, err := h.stor.List(ft)
if err != nil {
h.t.Fatal("get files: ", err)
}
- f := ff[rand.Intn(len(ff))]
- h.t.Logf("removing file @%d", f.Num())
- if err := f.Remove(); err != nil {
+ fd := fds[rand.Intn(len(fds))]
+ h.t.Logf("removing file @%d", fd.Num)
+ if err := h.stor.Remove(fd); err != nil {
h.t.Error("remove file: ", err)
}
}
@@ -221,6 +232,7 @@
func TestCorruptDB_Journal(t *testing.T) {
h := newDbCorruptHarness(t)
+ defer h.close()
h.build(100)
h.check(100, 100)
@@ -230,12 +242,11 @@
h.openDB()
h.check(36, 36)
-
- h.close()
}
func TestCorruptDB_Table(t *testing.T) {
h := newDbCorruptHarness(t)
+ defer h.close()
h.build(100)
h.compactMem()
@@ -246,12 +257,11 @@
h.openDB()
h.check(99, 99)
-
- h.close()
}
func TestCorruptDB_TableIndex(t *testing.T) {
h := newDbCorruptHarness(t)
+ defer h.close()
h.build(10000)
h.compactMem()
@@ -260,8 +270,6 @@
h.openDB()
h.check(5000, 9999)
-
- h.close()
}
func TestCorruptDB_MissingManifest(t *testing.T) {
@@ -271,6 +279,7 @@
Strict: opt.StrictJournalChecksum,
WriteBuffer: 1000 * 60,
})
+ defer h.close()
h.build(1000)
h.compactMem()
@@ -286,10 +295,8 @@
h.compactMem()
h.closeDB()
- h.stor.SetIgnoreOpenErr(storage.TypeManifest)
- h.removeAll(storage.TypeManifest)
+ h.forceRemoveAll(storage.TypeManifest)
h.openAssert(false)
- h.stor.SetIgnoreOpenErr(0)
h.recover()
h.check(1000, 1000)
@@ -300,12 +307,11 @@
h.recover()
h.check(1000, 1000)
-
- h.close()
}
func TestCorruptDB_SequenceNumberRecovery(t *testing.T) {
h := newDbCorruptHarness(t)
+ defer h.close()
h.put("foo", "v1")
h.put("foo", "v2")
@@ -321,12 +327,11 @@
h.reopenDB()
h.getVal("foo", "v6")
-
- h.close()
}
func TestCorruptDB_SequenceNumberRecoveryTable(t *testing.T) {
h := newDbCorruptHarness(t)
+ defer h.close()
h.put("foo", "v1")
h.put("foo", "v2")
@@ -344,12 +349,11 @@
h.reopenDB()
h.getVal("foo", "v6")
-
- h.close()
}
func TestCorruptDB_CorruptedManifest(t *testing.T) {
h := newDbCorruptHarness(t)
+ defer h.close()
h.put("foo", "hello")
h.compactMem()
@@ -360,12 +364,11 @@
h.recover()
h.getVal("foo", "hello")
-
- h.close()
}
func TestCorruptDB_CompactionInputError(t *testing.T) {
h := newDbCorruptHarness(t)
+ defer h.close()
h.build(10)
h.compactMem()
@@ -377,12 +380,11 @@
h.build(10000)
h.check(10000, 10000)
-
- h.close()
}
func TestCorruptDB_UnrelatedKeys(t *testing.T) {
h := newDbCorruptHarness(t)
+ defer h.close()
h.build(10)
h.compactMem()
@@ -394,12 +396,11 @@
h.getVal(string(tkey(1000)), string(tval(1000, ctValSize)))
h.compactMem()
h.getVal(string(tkey(1000)), string(tval(1000, ctValSize)))
-
- h.close()
}
func TestCorruptDB_Level0NewerFileHasOlderSeqnum(t *testing.T) {
h := newDbCorruptHarness(t)
+ defer h.close()
h.put("a", "v1")
h.put("b", "v1")
@@ -421,12 +422,11 @@
h.getVal("b", "v3")
h.getVal("c", "v0")
h.getVal("d", "v0")
-
- h.close()
}
func TestCorruptDB_RecoverInvalidSeq_Issue53(t *testing.T) {
h := newDbCorruptHarness(t)
+ defer h.close()
h.put("a", "v1")
h.put("b", "v1")
@@ -448,12 +448,11 @@
h.getVal("b", "v3")
h.getVal("c", "v0")
h.getVal("d", "v0")
-
- h.close()
}
func TestCorruptDB_MissingTableFiles(t *testing.T) {
h := newDbCorruptHarness(t)
+ defer h.close()
h.put("a", "v1")
h.put("b", "v1")
@@ -467,8 +466,6 @@
h.removeOne(storage.TypeTable)
h.openAssert(false)
-
- h.close()
}
func TestCorruptDB_RecoverTable(t *testing.T) {
@@ -477,6 +474,7 @@
CompactionTableSize: 90 * opt.KiB,
Filter: filter.NewBloomFilter(10),
})
+ defer h.close()
h.build(1000)
h.compactMem()
@@ -495,6 +493,4 @@
t.Errorf("invalid seq, want=%d got=%d", seq, h.db.seq)
}
h.check(985, 985)
-
- h.close()
}
diff --git a/leveldb/db.go b/leveldb/db.go
index 709a926..f111220 100644
--- a/leveldb/db.go
+++ b/leveldb/db.go
@@ -36,14 +36,14 @@
s *session
// MemDB.
- memMu sync.RWMutex
- memPool chan *memdb.DB
- mem, frozenMem *memDB
- journal *journal.Writer
- journalWriter storage.Writer
- journalFile storage.File
- frozenJournalFile storage.File
- frozenSeq uint64
+ memMu sync.RWMutex
+ memPool chan *memdb.DB
+ mem, frozenMem *memDB
+ journal *journal.Writer
+ journalWriter storage.Writer
+ journalFd storage.FileDesc
+ frozenJournalFd storage.FileDesc
+ frozenSeq uint64
// Snapshot.
snapsMu sync.Mutex
@@ -278,12 +278,11 @@
o.Strict &= ^opt.StrictReader
// Get all tables and sort it by file number.
- tableFiles_, err := s.getFiles(storage.TypeTable)
+ fds, err := s.stor.List(storage.TypeTable)
if err != nil {
return err
}
- tableFiles := files(tableFiles_)
- tableFiles.sort()
+ sortFds(fds)
var (
maxSeq uint64
@@ -296,17 +295,17 @@
rec = &sessionRecord{}
bpool = util.NewBufferPool(o.GetBlockSize() + 5)
)
- buildTable := func(iter iterator.Iterator) (tmp storage.File, size int64, err error) {
- tmp = s.newTemp()
- writer, err := tmp.Create()
+ buildTable := func(iter iterator.Iterator) (tmpFd storage.FileDesc, size int64, err error) {
+ tmpFd = s.newTemp()
+ writer, err := s.stor.Create(tmpFd)
if err != nil {
return
}
defer func() {
writer.Close()
if err != nil {
- tmp.Remove()
- tmp = nil
+ s.stor.Remove(tmpFd)
+ tmpFd = storage.FileDesc{}
}
}()
@@ -338,9 +337,9 @@
size = int64(tw.BytesLen())
return
}
- recoverTable := func(file storage.File) error {
- s.logf("table@recovery recovering @%d", file.Num())
- reader, err := file.Open()
+ recoverTable := func(fd storage.FileDesc) error {
+ s.logf("table@recovery recovering @%d", fd.Num)
+ reader, err := s.stor.Open(fd)
if err != nil {
return err
}
@@ -362,7 +361,7 @@
tgoodKey, tcorruptedKey, tcorruptedBlock int
imin, imax []byte
)
- tr, err := table.NewReader(reader, size, storage.NewFileInfo(file), nil, bpool, o)
+ tr, err := table.NewReader(reader, size, fd, nil, bpool, o)
if err != nil {
return err
}
@@ -370,7 +369,7 @@
if itererr, ok := iter.(iterator.ErrorCallbackSetter); ok {
itererr.SetErrorCallback(func(err error) {
if errors.IsCorrupted(err) {
- s.logf("table@recovery block corruption @%d %q", file.Num(), err)
+ s.logf("table@recovery block corruption @%d %q", fd.Num, err)
tcorruptedBlock++
}
})
@@ -405,23 +404,23 @@
if strict && (tcorruptedKey > 0 || tcorruptedBlock > 0) {
droppedTable++
- s.logf("table@recovery dropped @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", file.Num(), tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
+ s.logf("table@recovery dropped @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
return nil
}
if tgoodKey > 0 {
if tcorruptedKey > 0 || tcorruptedBlock > 0 {
// Rebuild the table.
- s.logf("table@recovery rebuilding @%d", file.Num())
+ s.logf("table@recovery rebuilding @%d", fd.Num)
iter := tr.NewIterator(nil, nil)
- tmp, newSize, err := buildTable(iter)
+ tmpFd, newSize, err := buildTable(iter)
iter.Release()
if err != nil {
return err
}
closed = true
reader.Close()
- if err := file.Replace(tmp); err != nil {
+ if err := s.stor.Rename(tmpFd, fd); err != nil {
return err
}
size = newSize
@@ -431,30 +430,30 @@
}
recoveredKey += tgoodKey
// Add table to level 0.
- rec.addTable(0, file.Num(), uint64(size), imin, imax)
- s.logf("table@recovery recovered @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", file.Num(), tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
+ rec.addTable(0, fd.Num, size, imin, imax)
+ s.logf("table@recovery recovered @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
} else {
droppedTable++
- s.logf("table@recovery unrecoverable @%d Ck·%d Cb·%d S·%d", file.Num(), tcorruptedKey, tcorruptedBlock, size)
+ s.logf("table@recovery unrecoverable @%d Ck·%d Cb·%d S·%d", fd.Num, tcorruptedKey, tcorruptedBlock, size)
}
return nil
}
// Recover all tables.
- if len(tableFiles) > 0 {
- s.logf("table@recovery F·%d", len(tableFiles))
+ if len(fds) > 0 {
+ s.logf("table@recovery F·%d", len(fds))
// Mark file number as used.
- s.markFileNum(tableFiles[len(tableFiles)-1].Num())
+ s.markFileNum(fds[len(fds)-1].Num)
- for _, file := range tableFiles {
- if err := recoverTable(file); err != nil {
+ for _, fd := range fds {
+ if err := recoverTable(fd); err != nil {
return err
}
}
- s.logf("table@recovery recovered F·%d N·%d Gk·%d Ck·%d Q·%d", len(tableFiles), recoveredKey, goodKey, corruptedKey, maxSeq)
+ s.logf("table@recovery recovered F·%d N·%d Gk·%d Ck·%d Q·%d", len(fds), recoveredKey, goodKey, corruptedKey, maxSeq)
}
// Set sequence number.
@@ -471,31 +470,31 @@
func (db *DB) recoverJournal() error {
// Get all journals and sort it by file number.
- allJournalFiles, err := db.s.getFiles(storage.TypeJournal)
+ fds_, err := db.s.stor.List(storage.TypeJournal)
if err != nil {
return err
}
- files(allJournalFiles).sort()
+ sortFds(fds_)
// Journals that will be recovered.
- var recJournalFiles []storage.File
- for _, jf := range allJournalFiles {
- if jf.Num() >= db.s.stJournalNum || jf.Num() == db.s.stPrevJournalNum {
- recJournalFiles = append(recJournalFiles, jf)
+ var fds []storage.FileDesc
+ for _, fd := range fds_ {
+ if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
+ fds = append(fds, fd)
}
}
var (
- of storage.File // Obsolete file.
+ ofd storage.FileDesc // Obsolete file.
rec = &sessionRecord{}
)
// Recover journals.
- if len(recJournalFiles) > 0 {
- db.logf("journal@recovery F·%d", len(recJournalFiles))
+ if len(fds) > 0 {
+ db.logf("journal@recovery F·%d", len(fds))
// Mark file number as used.
- db.s.markFileNum(recJournalFiles[len(recJournalFiles)-1].Num())
+ db.s.markFileNum(fds[len(fds)-1].Num)
var (
// Options.
@@ -509,23 +508,23 @@
batch = &Batch{}
)
- for _, jf := range recJournalFiles {
- db.logf("journal@recovery recovering @%d", jf.Num())
+ for _, fd := range fds {
+ db.logf("journal@recovery recovering @%d", fd.Num)
- fr, err := jf.Open()
+ fr, err := db.s.stor.Open(fd)
if err != nil {
return err
}
// Create or reset journal reader instance.
if jr == nil {
- jr = journal.NewReader(fr, dropper{db.s, jf}, strict, checksum)
+ jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
} else {
- jr.Reset(fr, dropper{db.s, jf}, strict, checksum)
+ jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
}
// Flush memdb and remove obsolete journal file.
- if of != nil {
+ if !ofd.Nil() {
if mdb.Len() > 0 {
if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
fr.Close()
@@ -533,7 +532,7 @@
}
}
- rec.setJournalNum(jf.Num())
+ rec.setJournalNum(fd.Num)
rec.setSeqNum(db.seq)
if err := db.s.commit(rec); err != nil {
fr.Close()
@@ -541,8 +540,8 @@
}
rec.resetAddedTables()
- of.Remove()
- of = nil
+ db.s.stor.Remove(ofd)
+ ofd = storage.FileDesc{}
}
// Replay journal to memdb.
@@ -555,7 +554,7 @@
}
fr.Close()
- return errors.SetFile(err, jf)
+ return errors.SetFd(err, fd)
}
buf.Reset()
@@ -566,7 +565,7 @@
}
fr.Close()
- return errors.SetFile(err, jf)
+ return errors.SetFd(err, fd)
}
if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
if !strict && errors.IsCorrupted(err) {
@@ -576,7 +575,7 @@
}
fr.Close()
- return errors.SetFile(err, jf)
+ return errors.SetFd(err, fd)
}
// Save sequence number.
@@ -594,7 +593,7 @@
}
fr.Close()
- of = jf
+ ofd = fd
}
// Flush the last memdb.
@@ -611,7 +610,7 @@
}
// Commit.
- rec.setJournalNum(db.journalFile.Num())
+ rec.setJournalNum(db.journalFd.Num)
rec.setSeqNum(db.seq)
if err := db.s.commit(rec); err != nil {
// Close journal on error.
@@ -623,8 +622,8 @@
}
// Remove the last obsolete journal file.
- if of != nil {
- of.Remove()
+ if !ofd.Nil() {
+ db.s.stor.Remove(ofd)
}
return nil
@@ -632,17 +631,17 @@
func (db *DB) recoverJournalRO() error {
// Get all journals and sort it by file number.
- allJournalFiles, err := db.s.getFiles(storage.TypeJournal)
+ fds_, err := db.s.stor.List(storage.TypeJournal)
if err != nil {
return err
}
- files(allJournalFiles).sort()
+ sortFds(fds_)
// Journals that will be recovered.
- var recJournalFiles []storage.File
- for _, jf := range allJournalFiles {
- if jf.Num() >= db.s.stJournalNum || jf.Num() == db.s.stPrevJournalNum {
- recJournalFiles = append(recJournalFiles, jf)
+ var fds []storage.FileDesc
+ for _, fd := range fds_ {
+ if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
+ fds = append(fds, fd)
}
}
@@ -656,8 +655,8 @@
)
// Recover journals.
- if len(recJournalFiles) > 0 {
- db.logf("journal@recovery RO·Mode F·%d", len(recJournalFiles))
+ if len(fds) > 0 {
+ db.logf("journal@recovery RO·Mode F·%d", len(fds))
var (
jr *journal.Reader
@@ -665,19 +664,19 @@
batch = &Batch{}
)
- for _, jf := range recJournalFiles {
- db.logf("journal@recovery recovering @%d", jf.Num())
+ for _, fd := range fds {
+ db.logf("journal@recovery recovering @%d", fd.Num)
- fr, err := jf.Open()
+ fr, err := db.s.stor.Open(fd)
if err != nil {
return err
}
// Create or reset journal reader instance.
if jr == nil {
- jr = journal.NewReader(fr, dropper{db.s, jf}, strict, checksum)
+ jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
} else {
- jr.Reset(fr, dropper{db.s, jf}, strict, checksum)
+ jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
}
// Replay journal to memdb.
@@ -689,7 +688,7 @@
}
fr.Close()
- return errors.SetFile(err, jf)
+ return errors.SetFd(err, fd)
}
buf.Reset()
@@ -700,7 +699,7 @@
}
fr.Close()
- return errors.SetFile(err, jf)
+ return errors.SetFd(err, fd)
}
if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
if !strict && errors.IsCorrupted(err) {
@@ -710,7 +709,7 @@
}
fr.Close()
- return errors.SetFile(err, jf)
+ return errors.SetFd(err, fd)
}
// Save sequence number.
@@ -942,7 +941,7 @@
for level, tables := range v.levels {
value += fmt.Sprintf("--- level %d ---\n", level)
for _, t := range tables {
- value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.file.Num(), t.size, t.imin, t.imax)
+ value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.fd.Num, t.size, t.imin, t.imax)
}
}
case p == "blockpool":
@@ -1063,8 +1062,6 @@
db.frozenMem = nil
db.journal = nil
db.journalWriter = nil
- db.journalFile = nil
- db.frozenJournalFile = nil
db.closer = nil
return err
diff --git a/leveldb/db_compaction.go b/leveldb/db_compaction.go
index 065ca20..4243dd1 100644
--- a/leveldb/db_compaction.go
+++ b/leveldb/db_compaction.go
@@ -12,6 +12,7 @@
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
+ "github.com/syndtr/goleveldb/leveldb/storage"
)
var (
@@ -20,8 +21,8 @@
type cStat struct {
duration time.Duration
- read uint64
- write uint64
+ read int64
+ write int64
}
func (p *cStat) add(n *cStatStaging) {
@@ -30,7 +31,7 @@
p.write += n.write
}
-func (p *cStat) get() (duration time.Duration, read, write uint64) {
+func (p *cStat) get() (duration time.Duration, read, write int64) {
return p.duration, p.read, p.write
}
@@ -38,8 +39,8 @@
start time.Time
duration time.Duration
on bool
- read uint64
- write uint64
+ read int64
+ write int64
}
func (p *cStatStaging) startTimer() {
@@ -72,7 +73,7 @@
p.lk.Unlock()
}
-func (p *cStats) getStat(level int) (duration time.Duration, read, write uint64) {
+func (p *cStats) getStat(level int) (duration time.Duration, read, write int64) {
p.lk.Lock()
defer p.lk.Unlock()
if level < len(p.stats) {
@@ -297,8 +298,7 @@
}, func() error {
for _, r := range rec.addedTables {
db.logf("memdb@flush revert @%d", r.num)
- f := db.s.getTableFile(r.num)
- if err := f.Remove(); err != nil {
+ if err := db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: r.num}); err != nil {
return err
}
}
@@ -307,7 +307,7 @@
db.compactionTransactFunc("memdb@commit", func(cnt *compactionTransactCounter) (err error) {
stats.startTimer()
- rec.setJournalNum(db.journalFile.Num())
+ rec.setJournalNum(db.journalFd.Num)
rec.setSeqNum(db.frozenSeq)
err = db.s.commit(rec)
stats.stopTimer()
@@ -399,7 +399,7 @@
}
b.rec.addTableFile(b.c.sourceLevel+1, t)
b.stat1.write += t.size
- b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, t.file.Num(), b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax)
+ b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, t.fd.Num, b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax)
b.tw = nil
return nil
}
@@ -522,8 +522,7 @@
func (b *tableCompactionBuilder) revert() error {
for _, at := range b.rec.addedTables {
b.s.logf("table@build revert @%d", at.num)
- f := b.s.getTableFile(at.num)
- if err := f.Remove(); err != nil {
+ if err := b.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: at.num}); err != nil {
return err
}
}
@@ -538,8 +537,8 @@
if !noTrivial && c.trivial() {
t := c.levels[0][0]
- db.logf("table@move L%d@%d -> L%d", c.sourceLevel, t.file.Num(), c.sourceLevel+1)
- rec.delTable(c.sourceLevel, t.file.Num())
+ db.logf("table@move L%d@%d -> L%d", c.sourceLevel, t.fd.Num, c.sourceLevel+1)
+ rec.delTable(c.sourceLevel, t.fd.Num)
rec.addTableFile(c.sourceLevel+1, t)
db.compactionTransactFunc("table@move", func(cnt *compactionTransactCounter) (err error) {
return db.s.commit(rec)
@@ -552,7 +551,7 @@
for _, t := range tables {
stats[i].read += t.size
// Insert deleted tables into record
- rec.delTable(c.sourceLevel+i, t.file.Num())
+ rec.delTable(c.sourceLevel+i, t.fd.Num)
}
}
sourceSize := int(stats[0].read + stats[1].read)
diff --git a/leveldb/db_state.go b/leveldb/db_state.go
index 24671dd..e763013 100644
--- a/leveldb/db_state.go
+++ b/leveldb/db_state.go
@@ -12,6 +12,7 @@
"github.com/syndtr/goleveldb/leveldb/journal"
"github.com/syndtr/goleveldb/leveldb/memdb"
+ "github.com/syndtr/goleveldb/leveldb/storage"
)
type memDB struct {
@@ -95,11 +96,10 @@
// Create new memdb and froze the old one; need external synchronization.
// newMem only called synchronously by the writer.
func (db *DB) newMem(n int) (mem *memDB, err error) {
- num := db.s.allocFileNum()
- file := db.s.getJournalFile(num)
- w, err := file.Create()
+ fd := storage.FileDesc{Type: storage.TypeJournal, Num: db.s.allocFileNum()}
+ w, err := db.s.stor.Create(fd)
if err != nil {
- db.s.reuseFileNum(num)
+ db.s.reuseFileNum(fd.Num)
return
}
@@ -115,10 +115,10 @@
} else {
db.journal.Reset(w)
db.journalWriter.Close()
- db.frozenJournalFile = db.journalFile
+ db.frozenJournalFd = db.journalFd
}
db.journalWriter = w
- db.journalFile = file
+ db.journalFd = fd
db.frozenMem = db.mem
mdb := db.mpoolGet()
if mdb == nil || mdb.Capacity() < n {
@@ -181,12 +181,12 @@
// Drop frozen memdb; assume that frozen memdb isn't nil.
func (db *DB) dropFrozenMem() {
db.memMu.Lock()
- if err := db.frozenJournalFile.Remove(); err != nil {
- db.logf("journal@remove removing @%d %q", db.frozenJournalFile.Num(), err)
+ if err := db.s.stor.Remove(db.frozenJournalFd); err != nil {
+ db.logf("journal@remove removing @%d %q", db.frozenJournalFd.Num, err)
} else {
- db.logf("journal@remove removed @%d", db.frozenJournalFile.Num())
+ db.logf("journal@remove removed @%d", db.frozenJournalFd.Num)
}
- db.frozenJournalFile = nil
+ db.frozenJournalFd = storage.FileDesc{}
db.frozenMem.decref()
db.frozenMem = nil
db.memMu.Unlock()
diff --git a/leveldb/db_test.go b/leveldb/db_test.go
index 8ec6629..b98ebf2 100644
--- a/leveldb/db_test.go
+++ b/leveldb/db_test.go
@@ -23,12 +23,15 @@
"time"
"unsafe"
+ "github.com/onsi/gomega"
+
"github.com/syndtr/goleveldb/leveldb/comparer"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage"
+ "github.com/syndtr/goleveldb/leveldb/testutil"
"github.com/syndtr/goleveldb/leveldb/util"
)
@@ -41,10 +44,23 @@
return randomString(r, n)
}
+func testingLogger(t *testing.T) func(log string) {
+ return func(log string) {
+ t.Log(log)
+ }
+}
+
+func testingPreserveOnFailed(t *testing.T) func() (preserve bool, err error) {
+ return func() (preserve bool, err error) {
+ preserve = t.Failed()
+ return
+ }
+}
+
type dbHarness struct {
t *testing.T
- stor *testStorage
+ stor *testutil.Storage
db *DB
o *opt.Options
ro *opt.ReadOptions
@@ -62,8 +78,11 @@
}
func (h *dbHarness) init(t *testing.T, o *opt.Options) {
+ gomega.RegisterTestingT(t)
h.t = t
- h.stor = newTestStorage(t)
+ h.stor = testutil.NewStorage()
+ h.stor.OnLog(testingLogger(t))
+ h.stor.OnClose(testingPreserveOnFailed(t))
h.o = o
h.ro = nil
h.wo = nil
@@ -93,21 +112,28 @@
}
func (h *dbHarness) closeDB() {
- if err := h.closeDB0(); err != nil {
- h.t.Error("Close: got error: ", err)
+ if h.db != nil {
+ if err := h.closeDB0(); err != nil {
+ h.t.Error("Close: got error: ", err)
+ }
+ h.db = nil
}
h.stor.CloseCheck()
runtime.GC()
}
func (h *dbHarness) reopenDB() {
- h.closeDB()
+ if h.db != nil {
+ h.closeDB()
+ }
h.openDB()
}
func (h *dbHarness) close() {
- h.closeDB0()
- h.db = nil
+ if h.db != nil {
+ h.closeDB0()
+ h.db = nil
+ }
h.stor.Close()
h.stor = nil
runtime.GC()
@@ -149,12 +175,12 @@
}
}
-func (h *dbHarness) maxNextLevelOverlappingBytes(want uint64) {
+func (h *dbHarness) maxNextLevelOverlappingBytes(want int64) {
t := h.t
db := h.db
var (
- maxOverlaps uint64
+ maxOverlaps int64
maxLevel int
)
v := db.s.version()
@@ -604,18 +630,18 @@
h.put("foo", "v1")
h.getVal("foo", "v1")
- h.stor.DelaySync(storage.TypeTable) // Block sync calls
- h.put("k1", strings.Repeat("x", 100000)) // Fill memtable
- h.put("k2", strings.Repeat("y", 100000)) // Trigger compaction
+ h.stor.Stall(testutil.ModeSync, storage.TypeTable) // Block sync calls
+ h.put("k1", strings.Repeat("x", 100000)) // Fill memtable
+ h.put("k2", strings.Repeat("y", 100000)) // Trigger compaction
for i := 0; h.db.getFrozenMem() == nil && i < 100; i++ {
time.Sleep(10 * time.Microsecond)
}
if h.db.getFrozenMem() == nil {
- h.stor.ReleaseSync(storage.TypeTable)
+ h.stor.Release(testutil.ModeSync, storage.TypeTable)
t.Fatal("No frozen mem")
}
h.getVal("foo", "v1")
- h.stor.ReleaseSync(storage.TypeTable) // Release sync calls
+ h.stor.Release(testutil.ModeSync, storage.TypeTable) // Release sync calls
h.reopenDB()
h.getVal("foo", "v1")
@@ -872,11 +898,11 @@
func TestDB_RecoverDuringMemtableCompaction(t *testing.T) {
truno(t, &opt.Options{WriteBuffer: 1000000}, func(h *dbHarness) {
- h.stor.DelaySync(storage.TypeTable)
+ h.stor.Stall(testutil.ModeSync, storage.TypeTable)
h.put("big1", strings.Repeat("x", 10000000))
h.put("big2", strings.Repeat("y", 1000))
h.put("bar", "v2")
- h.stor.ReleaseSync(storage.TypeTable)
+ h.stor.Release(testutil.ModeSync, storage.TypeTable)
h.reopenDB()
h.getVal("bar", "v2")
@@ -1308,14 +1334,14 @@
t.Errorf("total tables is %d, want %d", n, im*2)
}
- h.stor.SetEmuErr(storage.TypeTable, tsOpOpen)
+ h.stor.EmulateError(testutil.ModeOpen, storage.TypeTable, errors.New("open error during table compaction"))
go h.db.CompactRange(util.Range{})
if err := h.db.compSendIdle(h.db.tcompCmdC); err != nil {
t.Log("compaction error: ", err)
}
h.closeDB0()
h.openDB()
- h.stor.SetEmuErr(0, tsOpOpen)
+ h.stor.EmulateError(testutil.ModeOpen, storage.TypeTable, nil)
for i := 0; i < im; i++ {
for j := 0; j < jm; j++ {
@@ -1450,17 +1476,17 @@
v.release()
if i == 0 {
- h.stor.SetEmuErr(storage.TypeManifest, tsOpWrite)
+ h.stor.EmulateError(testutil.ModeWrite, storage.TypeManifest, errors.New("manifest write error"))
} else {
- h.stor.SetEmuErr(storage.TypeManifest, tsOpSync)
+ h.stor.EmulateError(testutil.ModeSync, storage.TypeManifest, errors.New("manifest sync error"))
}
// Merging compaction (will fail)
h.compactRangeAtErr(0, "", "", true)
h.db.Close()
- h.stor.SetEmuErr(0, tsOpWrite)
- h.stor.SetEmuErr(0, tsOpSync)
+ h.stor.EmulateError(testutil.ModeWrite, storage.TypeManifest, nil)
+ h.stor.EmulateError(testutil.ModeSync, storage.TypeManifest, nil)
// Should not lose data
h.openDB()
@@ -1667,32 +1693,31 @@
h.compactMem()
// Prevent auto compactions triggered by seeks
- h.stor.DelaySync(storage.TypeTable)
+ h.stor.Stall(testutil.ModeSync, storage.TypeTable)
// Lookup present keys. Should rarely read from small sstable.
- h.stor.SetReadCounter(storage.TypeTable)
+ h.stor.ResetCounter(testutil.ModeRead, storage.TypeTable)
for i := 0; i < n; i++ {
h.getVal(key(i), key(i))
}
- cnt := int(h.stor.ReadCounter())
+ cnt, _ := h.stor.Counter(testutil.ModeRead, storage.TypeTable)
t.Logf("lookup of %d present keys yield %d sstable I/O reads", n, cnt)
-
if min, max := n, n+2*n/100; cnt < min || cnt > max {
t.Errorf("num of sstable I/O reads of present keys not in range of %d - %d, got %d", min, max, cnt)
}
// Lookup missing keys. Should rarely read from either sstable.
- h.stor.ResetReadCounter()
+ h.stor.ResetCounter(testutil.ModeRead, storage.TypeTable)
for i := 0; i < n; i++ {
h.get(key(i)+".missing", false)
}
- cnt = int(h.stor.ReadCounter())
+ cnt, _ = h.stor.Counter(testutil.ModeRead, storage.TypeTable)
t.Logf("lookup of %d missing keys yield %d sstable I/O reads", n, cnt)
if max := 3 * n / 100; cnt > max {
t.Errorf("num of sstable I/O reads of missing keys was more than %d, got %d", max, cnt)
}
- h.stor.ReleaseSync(storage.TypeTable)
+ h.stor.Release(testutil.ModeSync, storage.TypeTable)
}
func TestDB_Concurrent(t *testing.T) {
@@ -2236,10 +2261,10 @@
key := fmt.Sprintf("KEY%8d", k)
b.Put([]byte(key), []byte(key+vtail))
}
- h.stor.SetEmuRandErr(storage.TypeTable, tsOpOpen, tsOpRead, tsOpReadAt)
+ h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, errors.New("table transient read error"))
if err := h.db.Write(b, nil); err != nil {
t.Logf("WRITE #%d error: %v", i, err)
- h.stor.SetEmuRandErr(0, tsOpOpen, tsOpRead, tsOpReadAt, tsOpWrite)
+ h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil)
for {
if err := h.db.Write(b, nil); err == nil {
break
@@ -2255,10 +2280,10 @@
key := fmt.Sprintf("KEY%8d", k)
b.Delete([]byte(key))
}
- h.stor.SetEmuRandErr(storage.TypeTable, tsOpOpen, tsOpRead, tsOpReadAt)
+ h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, errors.New("table transient read error"))
if err := h.db.Write(b, nil); err != nil {
t.Logf("WRITE #%d error: %v", i, err)
- h.stor.SetEmuRandErr(0, tsOpOpen, tsOpRead, tsOpReadAt)
+ h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil)
for {
if err := h.db.Write(b, nil); err == nil {
break
@@ -2268,7 +2293,7 @@
}
}
}
- h.stor.SetEmuRandErr(0, tsOpOpen, tsOpRead, tsOpReadAt)
+ h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil)
runtime.GOMAXPROCS(runtime.NumCPU())
@@ -2369,7 +2394,7 @@
h.waitCompaction()
for level, tables := range h.db.s.stVersion.levels {
for _, table := range tables {
- t.Logf("L%d@%d %q:%q", level, table.file.Num(), table.imin, table.imax)
+ t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax)
}
}
@@ -2377,14 +2402,14 @@
h.waitCompaction()
for level, tables := range h.db.s.stVersion.levels {
for _, table := range tables {
- t.Logf("L%d@%d %q:%q", level, table.file.Num(), table.imin, table.imax)
+ t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax)
}
}
h.compactRangeAt(1, "", "")
h.waitCompaction()
for level, tables := range h.db.s.stVersion.levels {
for _, table := range tables {
- t.Logf("L%d@%d %q:%q", level, table.file.Num(), table.imin, table.imax)
+ t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax)
}
}
runtime.GOMAXPROCS(runtime.NumCPU())
@@ -2415,7 +2440,10 @@
}
func TestDB_TableCompactionBuilder(t *testing.T) {
- stor := newTestStorage(t)
+ gomega.RegisterTestingT(t)
+ stor := testutil.NewStorage()
+ stor.OnLog(testingLogger(t))
+ stor.OnClose(testingPreserveOnFailed(t))
defer stor.Close()
const nSeq = 99
@@ -2482,7 +2510,7 @@
t.Fatal(err)
}
for _, t := range c.levels[0] {
- rec.delTable(c.sourceLevel, t.file.Num())
+ rec.delTable(c.sourceLevel, t.fd.Num)
}
if err := s.commit(rec); err != nil {
t.Fatal(err)
@@ -2506,11 +2534,11 @@
t.Fatal(err)
}
for _, t := range c.levels[0] {
- rec.delTable(c.sourceLevel, t.file.Num())
+ rec.delTable(c.sourceLevel, t.fd.Num)
}
// Move grandparent to level-3
for _, t := range v.levels[2] {
- rec.delTable(2, t.file.Num())
+ rec.delTable(2, t.fd.Num)
rec.addTableFile(3, t)
}
if err := s.commit(rec); err != nil {
@@ -2528,7 +2556,7 @@
for i, f := range v.levels[1][:len(v.levels[1])-1] {
nf := v.levels[1][i+1]
if bytes.Equal(f.imax.ukey(), nf.imin.ukey()) {
- t.Fatalf("KEY %q hop across table %d .. %d", f.imax.ukey(), f.file.Num(), nf.file.Num())
+ t.Fatalf("KEY %q hop across table %d .. %d", f.imax.ukey(), f.fd.Num, nf.fd.Num)
}
}
v.release()
@@ -2546,9 +2574,8 @@
strict: true,
tableSize: o.CompactionTableSize,
}
- stor.SetEmuErrOnce(storage.TypeTable, tsOpSync)
- stor.SetEmuRandErr(storage.TypeTable, tsOpRead, tsOpReadAt, tsOpWrite)
- stor.SetEmuRandErrProb(0xf0)
+ stor.EmulateErrorOnce(testutil.ModeSync, storage.TypeTable, errors.New("table sync error (once)"))
+ stor.EmulateRandomError(testutil.ModeRead|testutil.ModeWrite, storage.TypeTable, 0.01, errors.New("table random IO error"))
for {
if err := b.run(new(compactionTransactCounter)); err != nil {
t.Logf("(expected) b.run: %v", err)
@@ -2561,8 +2588,8 @@
}
c.release()
- stor.SetEmuErrOnce(0, tsOpSync)
- stor.SetEmuRandErr(0, tsOpRead, tsOpReadAt, tsOpWrite)
+ stor.EmulateErrorOnce(testutil.ModeSync, storage.TypeTable, nil)
+ stor.EmulateRandomError(testutil.ModeRead|testutil.ModeWrite, storage.TypeTable, 0, nil)
v = s.version()
if len(v.levels[1]) != len(v.levels[2]) {
@@ -2696,7 +2723,8 @@
t.Fatalf("SetReadOnly error: %v", err)
}
- h.stor.SetEmuErr(storage.TypeAll, tsOpCreate, tsOpReplace, tsOpRemove, tsOpWrite, tsOpWrite, tsOpSync)
+ mode := testutil.ModeCreate | testutil.ModeRemove | testutil.ModeRename | testutil.ModeWrite | testutil.ModeSync
+ h.stor.EmulateError(mode, storage.TypeAll, errors.New("read-only DB shouldn't writes"))
ro := func(key, value, wantValue string) {
if err := h.db.Put([]byte(key), []byte(value), h.wo); err != ErrReadOnly {
diff --git a/leveldb/db_util.go b/leveldb/db_util.go
index 23b866d..8ec86b2 100644
--- a/leveldb/db_util.go
+++ b/leveldb/db_util.go
@@ -40,59 +40,59 @@
v := db.s.version()
defer v.release()
- tablesMap := make(map[uint64]bool)
+ tmap := make(map[int64]bool)
for _, tables := range v.levels {
for _, t := range tables {
- tablesMap[t.file.Num()] = false
+ tmap[t.fd.Num] = false
}
}
- files, err := db.s.getFiles(storage.TypeAll)
+ fds, err := db.s.stor.List(storage.TypeAll)
if err != nil {
return err
}
- var nTables int
- var rem []storage.File
- for _, f := range files {
+ var nt int
+ var rem []storage.FileDesc
+ for _, fd := range fds {
keep := true
- switch f.Type() {
+ switch fd.Type {
case storage.TypeManifest:
- keep = f.Num() >= db.s.manifestFile.Num()
+ keep = fd.Num >= db.s.manifestFd.Num
case storage.TypeJournal:
- if db.frozenJournalFile != nil {
- keep = f.Num() >= db.frozenJournalFile.Num()
+ if !db.frozenJournalFd.Nil() {
+ keep = fd.Num >= db.frozenJournalFd.Num
} else {
- keep = f.Num() >= db.journalFile.Num()
+ keep = fd.Num >= db.journalFd.Num
}
case storage.TypeTable:
- _, keep = tablesMap[f.Num()]
+ _, keep = tmap[fd.Num]
if keep {
- tablesMap[f.Num()] = true
- nTables++
+ tmap[fd.Num] = true
+ nt++
}
}
if !keep {
- rem = append(rem, f)
+ rem = append(rem, fd)
}
}
- if nTables != len(tablesMap) {
- var missing []*storage.FileInfo
- for num, present := range tablesMap {
+ if nt != len(tmap) {
+ var mfds []storage.FileDesc
+ for num, present := range tmap {
if !present {
- missing = append(missing, &storage.FileInfo{Type: storage.TypeTable, Num: num})
+ mfds = append(mfds, storage.FileDesc{storage.TypeTable, num})
db.logf("db@janitor table missing @%d", num)
}
}
- return errors.NewErrCorrupted(nil, &errors.ErrMissingFiles{Files: missing})
+ return errors.NewErrCorrupted(storage.FileDesc{}, &errors.ErrMissingFiles{Fds: mfds})
}
- db.logf("db@janitor F·%d G·%d", len(files), len(rem))
- for _, f := range rem {
- db.logf("db@janitor removing %s-%d", f.Type(), f.Num())
- if err := f.Remove(); err != nil {
+ db.logf("db@janitor F·%d G·%d", len(fds), len(rem))
+ for _, fd := range rem {
+ db.logf("db@janitor removing %s-%d", fd.Type, fd.Num)
+ if err := db.s.stor.Remove(fd); err != nil {
return err
}
}
diff --git a/leveldb/errors/errors.go b/leveldb/errors/errors.go
index dacbf13..9a0f6e2 100644
--- a/leveldb/errors/errors.go
+++ b/leveldb/errors/errors.go
@@ -29,21 +29,21 @@
// ErrCorrupted is the type that wraps errors that indicate corruption in
// the database.
type ErrCorrupted struct {
- File *storage.FileInfo
- Err error
+ Fd storage.FileDesc
+ Err error
}
func (e *ErrCorrupted) Error() string {
- if e.File != nil {
- return fmt.Sprintf("%v [file=%v]", e.Err, e.File)
+ if !e.Fd.Nil() {
+ return fmt.Sprintf("%v [file=%v]", e.Err, e.Fd)
} else {
return e.Err.Error()
}
}
// NewErrCorrupted creates new ErrCorrupted error.
-func NewErrCorrupted(f storage.File, err error) error {
- return &ErrCorrupted{storage.NewFileInfo(f), err}
+func NewErrCorrupted(fd storage.FileDesc, err error) error {
+ return &ErrCorrupted{fd, err}
}
// IsCorrupted returns a boolean indicating whether the error is indicating
@@ -61,17 +61,17 @@
// ErrMissingFiles is the type that indicating a corruption due to missing
// files. ErrMissingFiles always wrapped with ErrCorrupted.
type ErrMissingFiles struct {
- Files []*storage.FileInfo
+ Fds []storage.FileDesc
}
func (e *ErrMissingFiles) Error() string { return "file missing" }
-// SetFile sets 'file info' of the given error with the given file.
+// SetFd sets 'file info' of the given error with the given file.
// Currently only ErrCorrupted is supported, otherwise will do nothing.
-func SetFile(err error, f storage.File) error {
+func SetFd(err error, fd storage.FileDesc) error {
switch x := err.(type) {
case *ErrCorrupted:
- x.File = storage.NewFileInfo(f)
+ x.Fd = fd
return x
}
return err
diff --git a/leveldb/journal/journal.go b/leveldb/journal/journal.go
index 6519ec6..891098b 100644
--- a/leveldb/journal/journal.go
+++ b/leveldb/journal/journal.go
@@ -83,6 +83,7 @@
"io"
"github.com/syndtr/goleveldb/leveldb/errors"
+ "github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/util"
)
@@ -165,7 +166,7 @@
r.dropper.Drop(&ErrCorrupted{n, reason})
}
if r.strict && !skip {
- r.err = errors.NewErrCorrupted(nil, &ErrCorrupted{n, reason})
+ r.err = errors.NewErrCorrupted(storage.FileDesc{}, &ErrCorrupted{n, reason})
return r.err
}
return errSkip
diff --git a/leveldb/key.go b/leveldb/key.go
index 572ae81..675c865 100644
--- a/leveldb/key.go
+++ b/leveldb/key.go
@@ -11,6 +11,7 @@
"fmt"
"github.com/syndtr/goleveldb/leveldb/errors"
+ "github.com/syndtr/goleveldb/leveldb/storage"
)
type ErrIkeyCorrupted struct {
@@ -23,7 +24,7 @@
}
func newErrIkeyCorrupted(ikey []byte, reason string) error {
- return errors.NewErrCorrupted(nil, &ErrIkeyCorrupted{append([]byte{}, ikey...), reason})
+ return errors.NewErrCorrupted(storage.FileDesc{}, &ErrIkeyCorrupted{append([]byte{}, ikey...), reason})
}
type kType int
diff --git a/leveldb/session.go b/leveldb/session.go
index ade94e5..a8d7b54 100644
--- a/leveldb/session.go
+++ b/leveldb/session.go
@@ -16,7 +16,6 @@
"github.com/syndtr/goleveldb/leveldb/journal"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage"
- "github.com/syndtr/goleveldb/leveldb/util"
)
type ErrManifestCorrupted struct {
@@ -28,28 +27,28 @@
return fmt.Sprintf("leveldb: manifest corrupted (field '%s'): %s", e.Field, e.Reason)
}
-func newErrManifestCorrupted(f storage.File, field, reason string) error {
- return errors.NewErrCorrupted(f, &ErrManifestCorrupted{field, reason})
+func newErrManifestCorrupted(fd storage.FileDesc, field, reason string) error {
+ return errors.NewErrCorrupted(fd, &ErrManifestCorrupted{field, reason})
}
// session represent a persistent database session.
type session struct {
// Need 64-bit alignment.
- stNextFileNum uint64 // current unused file number
- stJournalNum uint64 // current journal file number; need external synchronization
- stPrevJournalNum uint64 // prev journal file number; no longer used; for compatibility with older version of leveldb
+ stNextFileNum int64 // current unused file number
+ stJournalNum int64 // current journal file number; need external synchronization
+ stPrevJournalNum int64 // prev journal file number; no longer used; for compatibility with older version of leveldb
+ stTempFileNum int64
stSeqNum uint64 // last mem compacted seq; need external synchronization
- stTempFileNum uint64
stor storage.Storage
- storLock util.Releaser
+ storLock storage.Lock
o *cachedOptions
icmp *iComparer
tops *tOps
manifest *journal.Writer
manifestWriter storage.Writer
- manifestFile storage.File
+ manifestFd storage.FileDesc
stCompPtrs []iKey // compaction pointers; need external synchronization
stVersion *version // current version
@@ -87,7 +86,6 @@
}
s.manifest = nil
s.manifestWriter = nil
- s.manifestFile = nil
s.stVersion = nil
}
@@ -108,18 +106,18 @@
if os.IsNotExist(err) {
// Don't return os.ErrNotExist if the underlying storage contains
// other files that belong to LevelDB. So the DB won't get trashed.
- if files, _ := s.stor.GetFiles(storage.TypeAll); len(files) > 0 {
- err = &errors.ErrCorrupted{File: &storage.FileInfo{Type: storage.TypeManifest}, Err: &errors.ErrMissingFiles{}}
+ if fds, _ := s.stor.List(storage.TypeAll); len(fds) > 0 {
+ err = &errors.ErrCorrupted{Fd: storage.FileDesc{Type: storage.TypeManifest}, Err: &errors.ErrMissingFiles{}}
}
}
}()
- m, err := s.stor.GetManifest()
+ fd, err := s.stor.GetMeta()
if err != nil {
return
}
- reader, err := m.Open()
+ reader, err := s.stor.Open(fd)
if err != nil {
return
}
@@ -129,7 +127,7 @@
// Options.
strict = s.o.GetStrict(opt.StrictManifest)
- jr = journal.NewReader(reader, dropper{s, m}, strict, true)
+ jr = journal.NewReader(reader, dropper{s, fd}, strict, true)
rec = &sessionRecord{}
staging = s.stVersion.newStaging()
)
@@ -141,7 +139,7 @@
err = nil
break
}
- return errors.SetFile(err, m)
+ return errors.SetFd(err, fd)
}
err = rec.decode(r)
@@ -153,11 +151,11 @@
// commit record to version staging
staging.commit(rec)
} else {
- err = errors.SetFile(err, m)
+ err = errors.SetFd(err, fd)
if strict || !errors.IsCorrupted(err) {
return
} else {
- s.logf("manifest error: %v (skipped)", errors.SetFile(err, m))
+ s.logf("manifest error: %v (skipped)", errors.SetFd(err, fd))
}
}
rec.resetCompPtrs()
@@ -167,18 +165,18 @@
switch {
case !rec.has(recComparer):
- return newErrManifestCorrupted(m, "comparer", "missing")
+ return newErrManifestCorrupted(fd, "comparer", "missing")
case rec.comparer != s.icmp.uName():
- return newErrManifestCorrupted(m, "comparer", fmt.Sprintf("mismatch: want '%s', got '%s'", s.icmp.uName(), rec.comparer))
+ return newErrManifestCorrupted(fd, "comparer", fmt.Sprintf("mismatch: want '%s', got '%s'", s.icmp.uName(), rec.comparer))
case !rec.has(recNextFileNum):
- return newErrManifestCorrupted(m, "next-file-num", "missing")
+ return newErrManifestCorrupted(fd, "next-file-num", "missing")
case !rec.has(recJournalNum):
- return newErrManifestCorrupted(m, "journal-file-num", "missing")
+ return newErrManifestCorrupted(fd, "journal-file-num", "missing")
case !rec.has(recSeqNum):
- return newErrManifestCorrupted(m, "seq-num", "missing")
+ return newErrManifestCorrupted(fd, "seq-num", "missing")
}
- s.manifestFile = m
+ s.manifestFd = fd
s.setVersion(staging.finish())
s.setNextFileNum(rec.nextFileNum)
s.recordCommited(rec)
diff --git a/leveldb/session_compaction.go b/leveldb/session_compaction.go
index 417b8a1..471d68d 100644
--- a/leveldb/session_compaction.go
+++ b/leveldb/session_compaction.go
@@ -40,7 +40,7 @@
flushLevel := s.pickMemdbLevel(t.imin.ukey(), t.imax.ukey(), maxLevel)
rec.addTableFile(flushLevel, t)
- s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", flushLevel, t.file.Num(), n, shortenb(int(t.size)), t.imin, t.imax)
+ s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", flushLevel, t.fd.Num, n, shortenb(int(t.size)), t.imin, t.imax)
return flushLevel, nil
}
@@ -97,8 +97,8 @@
// and we must not pick one file and drop another older file if the
// two files overlap.
if !noLimit && sourceLevel > 0 {
- limit := uint64(v.s.o.GetCompactionSourceLimit(sourceLevel))
- total := uint64(0)
+ limit := int64(v.s.o.GetCompactionSourceLimit(sourceLevel))
+ total := int64(0)
for i, t := range t0 {
total += t.size
if total >= limit {
@@ -118,7 +118,7 @@
v: v,
sourceLevel: sourceLevel,
levels: [2]tFiles{t0, nil},
- maxGPOverlaps: uint64(s.o.GetCompactionGPOverlaps(sourceLevel)),
+ maxGPOverlaps: int64(s.o.GetCompactionGPOverlaps(sourceLevel)),
tPtrs: make([]int, len(v.levels)),
}
c.expand()
@@ -133,19 +133,19 @@
sourceLevel int
levels [2]tFiles
- maxGPOverlaps uint64
+ maxGPOverlaps int64
gp tFiles
gpi int
seenKey bool
- gpOverlappedBytes uint64
+ gpOverlappedBytes int64
imin, imax iKey
tPtrs []int
released bool
snapGPI int
snapSeenKey bool
- snapGPOverlappedBytes uint64
+ snapGPOverlappedBytes int64
snapTPtrs []int
}
@@ -172,7 +172,7 @@
// Expand compacted tables; need external synchronization.
func (c *compaction) expand() {
- limit := uint64(c.s.o.GetCompactionExpandLimit(c.sourceLevel))
+ limit := int64(c.s.o.GetCompactionExpandLimit(c.sourceLevel))
vt0 := c.v.levels[c.sourceLevel]
vt1 := tFiles{}
if level := c.sourceLevel + 1; level < len(c.v.levels) {
diff --git a/leveldb/session_record.go b/leveldb/session_record.go
index a09b2bc..9802e1a 100644
--- a/leveldb/session_record.go
+++ b/leveldb/session_record.go
@@ -13,6 +13,7 @@
"strings"
"github.com/syndtr/goleveldb/leveldb/errors"
+ "github.com/syndtr/goleveldb/leveldb/storage"
)
type byteReader interface {
@@ -40,23 +41,23 @@
type atRecord struct {
level int
- num uint64
- size uint64
+ num int64
+ size int64
imin iKey
imax iKey
}
type dtRecord struct {
level int
- num uint64
+ num int64
}
type sessionRecord struct {
hasRec int
comparer string
- journalNum uint64
- prevJournalNum uint64
- nextFileNum uint64
+ journalNum int64
+ prevJournalNum int64
+ nextFileNum int64
seqNum uint64
compPtrs []cpRecord
addedTables []atRecord
@@ -75,17 +76,17 @@
p.comparer = name
}
-func (p *sessionRecord) setJournalNum(num uint64) {
+func (p *sessionRecord) setJournalNum(num int64) {
p.hasRec |= 1 << recJournalNum
p.journalNum = num
}
-func (p *sessionRecord) setPrevJournalNum(num uint64) {
+func (p *sessionRecord) setPrevJournalNum(num int64) {
p.hasRec |= 1 << recPrevJournalNum
p.prevJournalNum = num
}
-func (p *sessionRecord) setNextFileNum(num uint64) {
+func (p *sessionRecord) setNextFileNum(num int64) {
p.hasRec |= 1 << recNextFileNum
p.nextFileNum = num
}
@@ -105,13 +106,13 @@
p.compPtrs = p.compPtrs[:0]
}
-func (p *sessionRecord) addTable(level int, num, size uint64, imin, imax iKey) {
+func (p *sessionRecord) addTable(level int, num, size int64, imin, imax iKey) {
p.hasRec |= 1 << recAddTable
p.addedTables = append(p.addedTables, atRecord{level, num, size, imin, imax})
}
func (p *sessionRecord) addTableFile(level int, t *tFile) {
- p.addTable(level, t.file.Num(), t.size, t.imin, t.imax)
+ p.addTable(level, t.fd.Num, t.size, t.imin, t.imax)
}
func (p *sessionRecord) resetAddedTables() {
@@ -119,7 +120,7 @@
p.addedTables = p.addedTables[:0]
}
-func (p *sessionRecord) delTable(level int, num uint64) {
+func (p *sessionRecord) delTable(level int, num int64) {
p.hasRec |= 1 << recDelTable
p.deletedTables = append(p.deletedTables, dtRecord{level, num})
}
@@ -137,6 +138,13 @@
_, p.err = w.Write(p.scratch[:n])
}
+func (p *sessionRecord) putVarint(w io.Writer, x int64) {
+ if x < 0 {
+ panic("invalid negative value")
+ }
+ p.putUvarint(w, uint64(x))
+}
+
func (p *sessionRecord) putBytes(w io.Writer, x []byte) {
if p.err != nil {
return
@@ -156,11 +164,11 @@
}
if p.has(recJournalNum) {
p.putUvarint(w, recJournalNum)
- p.putUvarint(w, p.journalNum)
+ p.putVarint(w, p.journalNum)
}
if p.has(recNextFileNum) {
p.putUvarint(w, recNextFileNum)
- p.putUvarint(w, p.nextFileNum)
+ p.putVarint(w, p.nextFileNum)
}
if p.has(recSeqNum) {
p.putUvarint(w, recSeqNum)
@@ -174,13 +182,13 @@
for _, r := range p.deletedTables {
p.putUvarint(w, recDelTable)
p.putUvarint(w, uint64(r.level))
- p.putUvarint(w, r.num)
+ p.putVarint(w, r.num)
}
for _, r := range p.addedTables {
p.putUvarint(w, recAddTable)
p.putUvarint(w, uint64(r.level))
- p.putUvarint(w, r.num)
- p.putUvarint(w, r.size)
+ p.putVarint(w, r.num)
+ p.putVarint(w, r.size)
p.putBytes(w, r.imin)
p.putBytes(w, r.imax)
}
@@ -194,9 +202,9 @@
x, err := binary.ReadUvarint(r)
if err != nil {
if err == io.ErrUnexpectedEOF || (mayEOF == false && err == io.EOF) {
- p.err = errors.NewErrCorrupted(nil, &ErrManifestCorrupted{field, "short read"})
+ p.err = errors.NewErrCorrupted(storage.FileDesc{}, &ErrManifestCorrupted{field, "short read"})
} else if strings.HasPrefix(err.Error(), "binary:") {
- p.err = errors.NewErrCorrupted(nil, &ErrManifestCorrupted{field, err.Error()})
+ p.err = errors.NewErrCorrupted(storage.FileDesc{}, &ErrManifestCorrupted{field, err.Error()})
} else {
p.err = err
}
@@ -209,6 +217,14 @@
return p.readUvarintMayEOF(field, r, false)
}
+func (p *sessionRecord) readVarint(field string, r io.ByteReader) int64 {
+ x := int64(p.readUvarintMayEOF(field, r, false))
+ if x < 0 {
+ p.err = errors.NewErrCorrupted(storage.FileDesc{}, &ErrManifestCorrupted{field, "invalid negative value"})
+ }
+ return x
+}
+
func (p *sessionRecord) readBytes(field string, r byteReader) []byte {
if p.err != nil {
return nil
@@ -221,7 +237,7 @@
_, p.err = io.ReadFull(r, x)
if p.err != nil {
if p.err == io.ErrUnexpectedEOF {
- p.err = errors.NewErrCorrupted(nil, &ErrManifestCorrupted{field, "short read"})
+ p.err = errors.NewErrCorrupted(storage.FileDesc{}, &ErrManifestCorrupted{field, "short read"})
}
return nil
}
@@ -260,17 +276,17 @@
p.setComparer(string(x))
}
case recJournalNum:
- x := p.readUvarint("journal-num", br)
+ x := p.readVarint("journal-num", br)
if p.err == nil {
p.setJournalNum(x)
}
case recPrevJournalNum:
- x := p.readUvarint("prev-journal-num", br)
+ x := p.readVarint("prev-journal-num", br)
if p.err == nil {
p.setPrevJournalNum(x)
}
case recNextFileNum:
- x := p.readUvarint("next-file-num", br)
+ x := p.readVarint("next-file-num", br)
if p.err == nil {
p.setNextFileNum(x)
}
@@ -287,8 +303,8 @@
}
case recAddTable:
level := p.readLevel("add-table.level", br)
- num := p.readUvarint("add-table.num", br)
- size := p.readUvarint("add-table.size", br)
+ num := p.readVarint("add-table.num", br)
+ size := p.readVarint("add-table.size", br)
imin := p.readBytes("add-table.imin", br)
imax := p.readBytes("add-table.imax", br)
if p.err == nil {
@@ -296,7 +312,7 @@
}
case recDelTable:
level := p.readLevel("del-table.level", br)
- num := p.readUvarint("del-table.num", br)
+ num := p.readVarint("del-table.num", br)
if p.err == nil {
p.delTable(level, num)
}
diff --git a/leveldb/session_record_test.go b/leveldb/session_record_test.go
index be853a7..ce7c06f 100644
--- a/leveldb/session_record_test.go
+++ b/leveldb/session_record_test.go
@@ -31,9 +31,9 @@
}
func TestSessionRecord_EncodeDecode(t *testing.T) {
- big := uint64(1) << 50
+ big := int64(1) << 50
v := &sessionRecord{}
- i := uint64(0)
+ i := int64(0)
test := func() {
res, err := decodeEncode(v)
if err != nil {
@@ -47,16 +47,16 @@
for ; i < 4; i++ {
test()
v.addTable(3, big+300+i, big+400+i,
- newIkey([]byte("foo"), big+500+1, ktVal),
- newIkey([]byte("zoo"), big+600+1, ktDel))
+ newIkey([]byte("foo"), uint64(big+500+1), ktVal),
+ newIkey([]byte("zoo"), uint64(big+600+1), ktDel))
v.delTable(4, big+700+i)
- v.addCompPtr(int(i), newIkey([]byte("x"), big+900+1, ktVal))
+ v.addCompPtr(int(i), newIkey([]byte("x"), uint64(big+900+1), ktVal))
}
v.setComparer("foo")
v.setJournalNum(big + 100)
v.setPrevJournalNum(big + 99)
v.setNextFileNum(big + 200)
- v.setSeqNum(big + 1000)
+ v.setSeqNum(uint64(big + 1000))
test()
}
diff --git a/leveldb/session_util.go b/leveldb/session_util.go
index a7a480a..e4fa98d 100644
--- a/leveldb/session_util.go
+++ b/leveldb/session_util.go
@@ -17,15 +17,15 @@
// Logging.
type dropper struct {
- s *session
- file storage.File
+ s *session
+ fd storage.FileDesc
}
func (d dropper) Drop(err error) {
if e, ok := err.(*journal.ErrCorrupted); ok {
- d.s.logf("journal@drop %s-%d S·%s %q", d.file.Type(), d.file.Num(), shortenb(e.Size), e.Reason)
+ d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(e.Size), e.Reason)
} else {
- d.s.logf("journal@drop %s-%d %q", d.file.Type(), d.file.Num(), err)
+ d.s.logf("journal@drop %s-%d %q", d.fd.Type, d.fd.Num, err)
}
}
@@ -34,25 +34,9 @@
// File utils.
-func (s *session) getJournalFile(num uint64) storage.File {
- return s.stor.GetFile(num, storage.TypeJournal)
-}
-
-func (s *session) getTableFile(num uint64) storage.File {
- return s.stor.GetFile(num, storage.TypeTable)
-}
-
-func (s *session) getFiles(t storage.FileType) ([]storage.File, error) {
- return s.stor.GetFiles(t)
-}
-
-func (s *session) newTemp() storage.File {
- num := atomic.AddUint64(&s.stTempFileNum, 1) - 1
- return s.stor.GetFile(num, storage.TypeTemp)
-}
-
-func (s *session) tableFileFromRecord(r atRecord) *tFile {
- return newTableFile(s.getTableFile(r.num), r.size, r.imin, r.imax)
+func (s *session) newTemp() storage.FileDesc {
+ num := atomic.AddInt64(&s.stTempFileNum, 1) - 1
+ return storage.FileDesc{storage.TypeTemp, num}
}
// Session state.
@@ -80,42 +64,42 @@
}
// Get current unused file number.
-func (s *session) nextFileNum() uint64 {
- return atomic.LoadUint64(&s.stNextFileNum)
+func (s *session) nextFileNum() int64 {
+ return atomic.LoadInt64(&s.stNextFileNum)
}
// Set current unused file number to num.
-func (s *session) setNextFileNum(num uint64) {
- atomic.StoreUint64(&s.stNextFileNum, num)
+func (s *session) setNextFileNum(num int64) {
+ atomic.StoreInt64(&s.stNextFileNum, num)
}
// Mark file number as used.
-func (s *session) markFileNum(num uint64) {
+func (s *session) markFileNum(num int64) {
nextFileNum := num + 1
for {
old, x := s.stNextFileNum, nextFileNum
if old > x {
x = old
}
- if atomic.CompareAndSwapUint64(&s.stNextFileNum, old, x) {
+ if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
break
}
}
}
// Allocate a file number.
-func (s *session) allocFileNum() uint64 {
- return atomic.AddUint64(&s.stNextFileNum, 1) - 1
+func (s *session) allocFileNum() int64 {
+ return atomic.AddInt64(&s.stNextFileNum, 1) - 1
}
// Reuse given file number.
-func (s *session) reuseFileNum(num uint64) {
+func (s *session) reuseFileNum(num int64) {
for {
old, x := s.stNextFileNum, num
if old != x+1 {
x = old
}
- if atomic.CompareAndSwapUint64(&s.stNextFileNum, old, x) {
+ if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
break
}
}
@@ -187,9 +171,8 @@
// Create a new manifest file; need external synchronization.
func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
- num := s.allocFileNum()
- file := s.stor.GetFile(num, storage.TypeManifest)
- writer, err := file.Create()
+ fd := storage.FileDesc{storage.TypeManifest, s.allocFileNum()}
+ writer, err := s.stor.Create(fd)
if err != nil {
return
}
@@ -214,16 +197,16 @@
if s.manifestWriter != nil {
s.manifestWriter.Close()
}
- if s.manifestFile != nil {
- s.manifestFile.Remove()
+ if !s.manifestFd.Nil() {
+ s.stor.Remove(s.manifestFd)
}
- s.manifestFile = file
+ s.manifestFd = fd
s.manifestWriter = writer
s.manifest = jw
} else {
writer.Close()
- file.Remove()
- s.reuseFileNum(num)
+ s.stor.Remove(fd)
+ s.reuseFileNum(fd.Num)
}
}()
@@ -239,7 +222,7 @@
if err != nil {
return
}
- err = s.stor.SetManifest(file)
+ err = s.stor.SetMeta(fd)
return
}
diff --git a/leveldb/storage/file_storage.go b/leveldb/storage/file_storage.go
index 3650929..e70eb64 100644
--- a/leveldb/storage/file_storage.go
+++ b/leveldb/storage/file_storage.go
@@ -17,8 +17,6 @@
"strings"
"sync"
"time"
-
- "github.com/syndtr/goleveldb/leveldb/util"
)
var errFileOpen = errors.New("leveldb/storage: file still open")
@@ -59,8 +57,8 @@
}
// OpenFile returns a new filesytem-backed storage implementation with the given
-// path. This also hold a file lock, so any subsequent attempt to open the same
-// path will fail.
+// path. This also acquire a file lock, so any subsequent attempt to open the
+// same path will fail.
//
// The storage must be closed after use, by calling Close method.
func OpenFile(path string) (Storage, error) {
@@ -94,7 +92,7 @@
return fs, nil
}
-func (fs *fileStorage) Lock() (util.Releaser, error) {
+func (fs *fileStorage) Lock() (Lock, error) {
fs.mu.Lock()
defer fs.mu.Unlock()
if fs.open < 0 {
@@ -181,52 +179,51 @@
fs.doLog(time.Now(), str)
}
-func (fs *fileStorage) GetFile(num uint64, t FileType) File {
- return &file{fs: fs, num: num, t: t}
-}
+func (fs *fileStorage) SetMeta(fd FileDesc) (err error) {
+ if !FileDescOk(fd) {
+ return ErrInvalidFile
+ }
-func (fs *fileStorage) GetFiles(t FileType) (ff []File, err error) {
fs.mu.Lock()
defer fs.mu.Unlock()
if fs.open < 0 {
- return nil, ErrClosed
+ return ErrClosed
}
- dir, err := os.Open(fs.path)
- if err != nil {
- return
- }
- fnn, err := dir.Readdirnames(0)
- // Close the dir first before checking for Readdirnames error.
- if err := dir.Close(); err != nil {
- fs.log(fmt.Sprintf("close dir: %v", err))
- }
- if err != nil {
- return
- }
- f := &file{fs: fs}
- for _, fn := range fnn {
- if f.parse(fn) && (f.t&t) != 0 {
- ff = append(ff, f)
- f = &file{fs: fs}
+ defer func() {
+ if err != nil {
+ fs.log(fmt.Sprintf("CURRENT: %v", err))
}
+ }()
+ path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), fd.Num)
+ w, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
+ if err != nil {
+ return
}
- return
+ _, err = fmt.Fprintln(w, fsGenName(fd))
+ // Close the file first.
+ if cerr := w.Close(); cerr != nil {
+ fs.log(fmt.Sprintf("close CURRENT.%d: %v", fd.Num, cerr))
+ }
+ if err != nil {
+ return
+ }
+ return rename(path, filepath.Join(fs.path, "CURRENT"))
}
-func (fs *fileStorage) GetManifest() (f File, err error) {
+func (fs *fileStorage) GetMeta() (fd FileDesc, err error) {
fs.mu.Lock()
defer fs.mu.Unlock()
if fs.open < 0 {
- return nil, ErrClosed
+ return FileDesc{}, ErrClosed
}
dir, err := os.Open(fs.path)
if err != nil {
return
}
- fnn, err := dir.Readdirnames(0)
+ names, err := dir.Readdirnames(0)
// Close the dir first before checking for Readdirnames error.
- if err := dir.Close(); err != nil {
- fs.log(fmt.Sprintf("close dir: %v", err))
+ if ce := dir.Close(); ce != nil {
+ fs.log(fmt.Sprintf("close dir: %v", ce))
}
if err != nil {
return
@@ -235,58 +232,64 @@
var rem []string
var pend bool
var cerr error
- for _, fn := range fnn {
- if strings.HasPrefix(fn, "CURRENT") {
- pend1 := len(fn) > 7
+ for _, name := range names {
+ if strings.HasPrefix(name, "CURRENT") {
+ pend1 := len(name) > 7
+ var pendNum int64
// Make sure it is valid name for a CURRENT file, otherwise skip it.
if pend1 {
- if fn[7] != '.' || len(fn) < 9 {
- fs.log(fmt.Sprintf("skipping %s: invalid file name", fn))
+ if name[7] != '.' || len(name) < 9 {
+ fs.log(fmt.Sprintf("skipping %s: invalid file name", name))
continue
}
- if _, e1 := strconv.ParseUint(fn[8:], 10, 0); e1 != nil {
- fs.log(fmt.Sprintf("skipping %s: invalid file num: %v", fn, e1))
+ var e1 error
+ if pendNum, e1 = strconv.ParseInt(name[8:], 10, 0); e1 != nil {
+ fs.log(fmt.Sprintf("skipping %s: invalid file num: %v", name, e1))
continue
}
}
- path := filepath.Join(fs.path, fn)
+ path := filepath.Join(fs.path, name)
r, e1 := os.OpenFile(path, os.O_RDONLY, 0)
if e1 != nil {
- return nil, e1
+ return FileDesc{}, e1
}
b, e1 := ioutil.ReadAll(r)
if e1 != nil {
r.Close()
- return nil, e1
+ return FileDesc{}, e1
}
- f1 := &file{fs: fs}
- if len(b) < 1 || b[len(b)-1] != '\n' || !f1.parse(string(b[:len(b)-1])) {
- fs.log(fmt.Sprintf("skipping %s: corrupted or incomplete", fn))
+ var fd1 FileDesc
+ if len(b) < 1 || b[len(b)-1] != '\n' || !fsParseNamePtr(string(b[:len(b)-1]), &fd1) {
+ fs.log(fmt.Sprintf("skipping %s: corrupted or incomplete", name))
if pend1 {
- rem = append(rem, fn)
+ rem = append(rem, name)
}
if !pend1 || cerr == nil {
+ metaFd, _ := fsParseName(name)
cerr = &ErrCorrupted{
- File: fsParseName(filepath.Base(fn)),
- Err: errors.New("leveldb/storage: corrupted or incomplete manifest file"),
+ Fd: metaFd,
+ Err: errors.New("leveldb/storage: corrupted or incomplete meta file"),
}
}
- } else if f != nil && f1.Num() < f.Num() {
- fs.log(fmt.Sprintf("skipping %s: obsolete", fn))
+ } else if pend1 && pendNum != fd1.Num {
+ fs.log(fmt.Sprintf("skipping %s: inconsistent pending-file num: %d vs %d", name, pendNum, fd1.Num))
+ rem = append(rem, name)
+ } else if fd1.Num < fd.Num {
+ fs.log(fmt.Sprintf("skipping %s: obsolete", name))
if pend1 {
- rem = append(rem, fn)
+ rem = append(rem, name)
}
} else {
- f = f1
+ fd = fd1
pend = pend1
}
if err := r.Close(); err != nil {
- fs.log(fmt.Sprintf("close %s: %v", fn, err))
+ fs.log(fmt.Sprintf("close %s: %v", name, err))
}
}
}
// Don't remove any files if there is no valid CURRENT file.
- if f == nil {
+ if fd.Nil() {
if cerr != nil {
err = cerr
} else {
@@ -296,50 +299,127 @@
}
// Rename pending CURRENT file to an effective CURRENT.
if pend {
- path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), f.Num())
+ path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), fd.Num)
if err := rename(path, filepath.Join(fs.path, "CURRENT")); err != nil {
- fs.log(fmt.Sprintf("CURRENT.%d -> CURRENT: %v", f.Num(), err))
+ fs.log(fmt.Sprintf("CURRENT.%d -> CURRENT: %v", fd.Num, err))
}
}
// Remove obsolete or incomplete pending CURRENT files.
- for _, fn := range rem {
- path := filepath.Join(fs.path, fn)
+ for _, name := range rem {
+ path := filepath.Join(fs.path, name)
if err := os.Remove(path); err != nil {
- fs.log(fmt.Sprintf("remove %s: %v", fn, err))
+ fs.log(fmt.Sprintf("remove %s: %v", name, err))
}
}
return
}
-func (fs *fileStorage) SetManifest(f File) (err error) {
+func (fs *fileStorage) List(ft FileType) (fds []FileDesc, err error) {
+ fs.mu.Lock()
+ defer fs.mu.Unlock()
+ if fs.open < 0 {
+ return nil, ErrClosed
+ }
+ dir, err := os.Open(fs.path)
+ if err != nil {
+ return
+ }
+ names, err := dir.Readdirnames(0)
+ // Close the dir first before checking for Readdirnames error.
+ if cerr := dir.Close(); cerr != nil {
+ fs.log(fmt.Sprintf("close dir: %v", cerr))
+ }
+ if err == nil {
+ for _, name := range names {
+ if fd, ok := fsParseName(name); ok && fd.Type&ft != 0 {
+ fds = append(fds, fd)
+ }
+ }
+ }
+ return
+}
+
+func (fs *fileStorage) Open(fd FileDesc) (Reader, error) {
+ if !FileDescOk(fd) {
+ return nil, ErrInvalidFile
+ }
+
+ fs.mu.Lock()
+ defer fs.mu.Unlock()
+ if fs.open < 0 {
+ return nil, ErrClosed
+ }
+ of, err := os.OpenFile(filepath.Join(fs.path, fsGenName(fd)), os.O_RDONLY, 0)
+ if err != nil {
+ if fsHasOldName(fd) && os.IsNotExist(err) {
+ of, err = os.OpenFile(filepath.Join(fs.path, fsGenOldName(fd)), os.O_RDONLY, 0)
+ if err == nil {
+ goto ok
+ }
+ }
+ return nil, err
+ }
+ok:
+ fs.open++
+ return &fileWrap{File: of, fs: fs, fd: fd}, nil
+}
+
+func (fs *fileStorage) Create(fd FileDesc) (Writer, error) {
+ if !FileDescOk(fd) {
+ return nil, ErrInvalidFile
+ }
+
+ fs.mu.Lock()
+ defer fs.mu.Unlock()
+ if fs.open < 0 {
+ return nil, ErrClosed
+ }
+ of, err := os.OpenFile(filepath.Join(fs.path, fsGenName(fd)), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
+ if err != nil {
+ return nil, err
+ }
+ fs.open++
+ return &fileWrap{File: of, fs: fs, fd: fd}, nil
+}
+
+func (fs *fileStorage) Remove(fd FileDesc) error {
+ if !FileDescOk(fd) {
+ return ErrInvalidFile
+ }
+
fs.mu.Lock()
defer fs.mu.Unlock()
if fs.open < 0 {
return ErrClosed
}
- f2, ok := f.(*file)
- if !ok || f2.t != TypeManifest {
+ err := os.Remove(filepath.Join(fs.path, fsGenName(fd)))
+ if err != nil {
+ if fsHasOldName(fd) && os.IsNotExist(err) {
+ if e1 := os.Remove(filepath.Join(fs.path, fsGenOldName(fd))); !os.IsNotExist(e1) {
+ fs.log(fmt.Sprintf("remove %s: %v (old name)", fd, err))
+ err = e1
+ }
+ } else {
+ fs.log(fmt.Sprintf("remove %s: %v", fd, err))
+ }
+ }
+ return err
+}
+
+func (fs *fileStorage) Rename(oldfd, newfd FileDesc) error {
+ if !FileDescOk(oldfd) || !FileDescOk(newfd) {
return ErrInvalidFile
}
- defer func() {
- if err != nil {
- fs.log(fmt.Sprintf("CURRENT: %v", err))
- }
- }()
- path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), f2.Num())
- w, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
- if err != nil {
- return err
+ if oldfd == newfd {
+ return nil
}
- _, err = fmt.Fprintln(w, f2.name())
- // Close the file first.
- if err := w.Close(); err != nil {
- fs.log(fmt.Sprintf("close CURRENT.%d: %v", f2.num, err))
+
+ fs.mu.Lock()
+ defer fs.mu.Unlock()
+ if fs.open < 0 {
+ return ErrClosed
}
- if err != nil {
- return err
- }
- return rename(path, filepath.Join(fs.path, "CURRENT"))
+ return rename(filepath.Join(fs.path, fsGenName(oldfd)), filepath.Join(fs.path, fsGenName(newfd)))
}
func (fs *fileStorage) Close() error {
@@ -363,202 +443,96 @@
type fileWrap struct {
*os.File
- f *file
+ fs *fileStorage
+ fd FileDesc
+ closed bool
}
-func (fw fileWrap) Sync() error {
+func (fw *fileWrap) Sync() error {
if err := fw.File.Sync(); err != nil {
return err
}
- if fw.f.Type() == TypeManifest {
+ if fw.fd.Type == TypeManifest {
// Also sync parent directory if file type is manifest.
// See: https://code.google.com/p/leveldb/issues/detail?id=190.
- if err := syncDir(fw.f.fs.path); err != nil {
+ if err := syncDir(fw.fs.path); err != nil {
+ fw.fs.log(fmt.Sprintf("syncDir: %v", err))
return err
}
}
return nil
}
-func (fw fileWrap) Close() error {
- f := fw.f
- f.fs.mu.Lock()
- defer f.fs.mu.Unlock()
- if !f.open {
+func (fw *fileWrap) Close() error {
+ fw.fs.mu.Lock()
+ defer fw.fs.mu.Unlock()
+ if fw.closed {
return ErrClosed
}
- f.open = false
- f.fs.open--
+ fw.closed = true
+ fw.fs.open--
err := fw.File.Close()
if err != nil {
- f.fs.log(fmt.Sprintf("close %s.%d: %v", f.Type(), f.Num(), err))
+ fw.fs.log(fmt.Sprintf("close %s: %v", fw.fd, err))
}
return err
}
-type file struct {
- fs *fileStorage
- num uint64
- t FileType
- open bool
-}
-
-func (f *file) Open() (Reader, error) {
- f.fs.mu.Lock()
- defer f.fs.mu.Unlock()
- if f.fs.open < 0 {
- return nil, ErrClosed
- }
- if f.open {
- return nil, errFileOpen
- }
- of, err := os.OpenFile(f.path(), os.O_RDONLY, 0)
- if err != nil {
- if f.hasOldName() && os.IsNotExist(err) {
- of, err = os.OpenFile(f.oldPath(), os.O_RDONLY, 0)
- if err == nil {
- goto ok
- }
- }
- return nil, err
- }
-ok:
- f.open = true
- f.fs.open++
- return fileWrap{of, f}, nil
-}
-
-func (f *file) Create() (Writer, error) {
- f.fs.mu.Lock()
- defer f.fs.mu.Unlock()
- if f.fs.open < 0 {
- return nil, ErrClosed
- }
- if f.open {
- return nil, errFileOpen
- }
- of, err := os.OpenFile(f.path(), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
- if err != nil {
- return nil, err
- }
- f.open = true
- f.fs.open++
- return fileWrap{of, f}, nil
-}
-
-func (f *file) Replace(newfile File) error {
- f.fs.mu.Lock()
- defer f.fs.mu.Unlock()
- if f.fs.open < 0 {
- return ErrClosed
- }
- newfile2, ok := newfile.(*file)
- if !ok {
- return ErrInvalidFile
- }
- if f.open || newfile2.open {
- return errFileOpen
- }
- return rename(newfile2.path(), f.path())
-}
-
-func (f *file) Type() FileType {
- return f.t
-}
-
-func (f *file) Num() uint64 {
- return f.num
-}
-
-func (f *file) Remove() error {
- f.fs.mu.Lock()
- defer f.fs.mu.Unlock()
- if f.fs.open < 0 {
- return ErrClosed
- }
- if f.open {
- return errFileOpen
- }
- err := os.Remove(f.path())
- if err != nil {
- f.fs.log(fmt.Sprintf("remove %s.%d: %v", f.Type(), f.Num(), err))
- }
- // Also try remove file with old name, just in case.
- if f.hasOldName() {
- if e1 := os.Remove(f.oldPath()); !os.IsNotExist(e1) {
- f.fs.log(fmt.Sprintf("remove %s.%d: %v (old name)", f.Type(), f.Num(), err))
- err = e1
- }
- }
- return err
-}
-
-func (f *file) hasOldName() bool {
- return f.t == TypeTable
-}
-
-func (f *file) oldName() string {
- switch f.t {
- case TypeTable:
- return fmt.Sprintf("%06d.sst", f.num)
- }
- return f.name()
-}
-
-func (f *file) oldPath() string {
- return filepath.Join(f.fs.path, f.oldName())
-}
-
-func (f *file) name() string {
- switch f.t {
+func fsGenName(fd FileDesc) string {
+ switch fd.Type {
case TypeManifest:
- return fmt.Sprintf("MANIFEST-%06d", f.num)
+ return fmt.Sprintf("MANIFEST-%06d", fd.Num)
case TypeJournal:
- return fmt.Sprintf("%06d.log", f.num)
+ return fmt.Sprintf("%06d.log", fd.Num)
case TypeTable:
- return fmt.Sprintf("%06d.ldb", f.num)
+ return fmt.Sprintf("%06d.ldb", fd.Num)
case TypeTemp:
- return fmt.Sprintf("%06d.tmp", f.num)
+ return fmt.Sprintf("%06d.tmp", fd.Num)
default:
panic("invalid file type")
}
}
-func (f *file) path() string {
- return filepath.Join(f.fs.path, f.name())
+func fsHasOldName(fd FileDesc) bool {
+ return fd.Type == TypeTable
}
-func fsParseName(name string) *FileInfo {
- fi := &FileInfo{}
+func fsGenOldName(fd FileDesc) string {
+ switch fd.Type {
+ case TypeTable:
+ return fmt.Sprintf("%06d.sst", fd.Num)
+ }
+ return fsGenName(fd)
+}
+
+func fsParseName(name string) (fd FileDesc, ok bool) {
var tail string
- _, err := fmt.Sscanf(name, "%d.%s", &fi.Num, &tail)
+ _, err := fmt.Sscanf(name, "%d.%s", &fd.Num, &tail)
if err == nil {
switch tail {
case "log":
- fi.Type = TypeJournal
+ fd.Type = TypeJournal
case "ldb", "sst":
- fi.Type = TypeTable
+ fd.Type = TypeTable
case "tmp":
- fi.Type = TypeTemp
+ fd.Type = TypeTemp
default:
- return nil
+ return
}
- return fi
+ return fd, true
}
- n, _ := fmt.Sscanf(name, "MANIFEST-%d%s", &fi.Num, &tail)
+ n, _ := fmt.Sscanf(name, "MANIFEST-%d%s", &fd.Num, &tail)
if n == 1 {
- fi.Type = TypeManifest
- return fi
+ fd.Type = TypeManifest
+ return fd, true
}
- return nil
+ return
}
-func (f *file) parse(name string) bool {
- fi := fsParseName(name)
- if fi == nil {
- return false
+func fsParseNamePtr(name string, fd *FileDesc) bool {
+ _fd, ok := fsParseName(name)
+ if fd != nil {
+ *fd = _fd
}
- f.t = fi.Type
- f.num = fi.Num
- return true
+ return ok
}
diff --git a/leveldb/storage/file_storage_test.go b/leveldb/storage/file_storage_test.go
index 92abcbb..f88011c 100644
--- a/leveldb/storage/file_storage_test.go
+++ b/leveldb/storage/file_storage_test.go
@@ -17,14 +17,14 @@
oldName []string
name string
ftype FileType
- num uint64
+ num int64
}{
{nil, "000100.log", TypeJournal, 100},
{nil, "000000.log", TypeJournal, 0},
{[]string{"000000.sst"}, "000000.ldb", TypeTable, 0},
{nil, "MANIFEST-000002", TypeManifest, 2},
{nil, "MANIFEST-000007", TypeManifest, 7},
- {nil, "18446744073709551615.log", TypeJournal, 18446744073709551615},
+ {nil, "9223372036854775807.log", TypeJournal, 9223372036854775807},
{nil, "000100.tmp", TypeTemp, 100},
}
@@ -55,9 +55,8 @@
func TestFileStorage_CreateFileName(t *testing.T) {
for _, c := range cases {
- f := &file{num: c.num, t: c.ftype}
- if f.name() != c.name {
- t.Errorf("invalid filename got '%s', want '%s'", f.name(), c.name)
+ if name := fsGenName(FileDesc{c.ftype, c.num}); name != c.name {
+ t.Errorf("invalid filename got '%s', want '%s'", name, c.name)
}
}
}
@@ -65,16 +64,16 @@
func TestFileStorage_ParseFileName(t *testing.T) {
for _, c := range cases {
for _, name := range append([]string{c.name}, c.oldName...) {
- f := new(file)
- if !f.parse(name) {
+ fd, ok := fsParseName(name)
+ if !ok {
t.Errorf("cannot parse filename '%s'", name)
continue
}
- if f.Type() != c.ftype {
- t.Errorf("filename '%s' invalid type got '%d', want '%d'", name, f.Type(), c.ftype)
+ if fd.Type != c.ftype {
+ t.Errorf("filename '%s' invalid type got '%d', want '%d'", name, fd.Type, c.ftype)
}
- if f.Num() != c.num {
- t.Errorf("filename '%s' invalid number got '%d', want '%d'", name, f.Num(), c.num)
+ if fd.Num != c.num {
+ t.Errorf("filename '%s' invalid number got '%d', want '%d'", name, fd.Num, c.num)
}
}
}
@@ -82,8 +81,7 @@
func TestFileStorage_InvalidFileName(t *testing.T) {
for _, name := range invalidCases {
- f := new(file)
- if f.parse(name) {
+ if fsParseNamePtr(name, nil) {
t.Errorf("filename '%s' should be invalid", name)
}
}
diff --git a/leveldb/storage/mem_storage.go b/leveldb/storage/mem_storage.go
index fc1c816..9b70e15 100644
--- a/leveldb/storage/mem_storage.go
+++ b/leveldb/storage/mem_storage.go
@@ -10,8 +10,6 @@
"bytes"
"os"
"sync"
-
- "github.com/syndtr/goleveldb/leveldb/util"
)
const typeShift = 3
@@ -32,10 +30,10 @@
// memStorage is a memory-backed storage.
type memStorage struct {
- mu sync.Mutex
- slock *memStorageLock
- files map[uint64]*memFile
- manifest *memFilePtr
+ mu sync.Mutex
+ slock *memStorageLock
+ files map[uint64]*memFile
+ meta FileDesc
}
// NewMemStorage returns a new memory-backed storage implementation.
@@ -45,7 +43,7 @@
}
}
-func (ms *memStorage) Lock() (util.Releaser, error) {
+func (ms *memStorage) Lock() (Lock, error) {
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.slock != nil {
@@ -57,147 +55,164 @@
func (*memStorage) Log(str string) {}
-func (ms *memStorage) GetFile(num uint64, t FileType) File {
- return &memFilePtr{ms: ms, num: num, t: t}
-}
-
-func (ms *memStorage) GetFiles(t FileType) ([]File, error) {
- ms.mu.Lock()
- var ff []File
- for x, _ := range ms.files {
- num, mt := x>>typeShift, FileType(x)&TypeAll
- if mt&t == 0 {
- continue
- }
- ff = append(ff, &memFilePtr{ms: ms, num: num, t: mt})
- }
- ms.mu.Unlock()
- return ff, nil
-}
-
-func (ms *memStorage) GetManifest() (File, error) {
- ms.mu.Lock()
- defer ms.mu.Unlock()
- if ms.manifest == nil {
- return nil, os.ErrNotExist
- }
- return ms.manifest, nil
-}
-
-func (ms *memStorage) SetManifest(f File) error {
- fm, ok := f.(*memFilePtr)
- if !ok || fm.t != TypeManifest {
+func (ms *memStorage) SetMeta(fd FileDesc) error {
+ if !FileDescOk(fd) {
return ErrInvalidFile
}
+
ms.mu.Lock()
- ms.manifest = fm
+ ms.meta = fd
ms.mu.Unlock()
return nil
}
-func (*memStorage) Close() error { return nil }
-
-type memReader struct {
- *bytes.Reader
- m *memFile
-}
-
-func (mr *memReader) Close() error {
- return mr.m.Close()
-}
-
-type memFile struct {
- bytes.Buffer
- ms *memStorage
- open bool
-}
-
-func (*memFile) Sync() error { return nil }
-func (m *memFile) Close() error {
- m.ms.mu.Lock()
- m.open = false
- m.ms.mu.Unlock()
- return nil
-}
-
-type memFilePtr struct {
- ms *memStorage
- num uint64
- t FileType
-}
-
-func (p *memFilePtr) x() uint64 {
- return p.Num()<<typeShift | uint64(p.Type())
-}
-
-func (p *memFilePtr) Open() (Reader, error) {
- ms := p.ms
+func (ms *memStorage) GetMeta() (FileDesc, error) {
ms.mu.Lock()
defer ms.mu.Unlock()
- if m, exist := ms.files[p.x()]; exist {
+ if ms.meta.Nil() {
+ return FileDesc{}, os.ErrNotExist
+ }
+ return ms.meta, nil
+}
+
+func (ms *memStorage) List(ft FileType) ([]FileDesc, error) {
+ ms.mu.Lock()
+ var fds []FileDesc
+ for x, _ := range ms.files {
+ fd := unpackFile(x)
+ if fd.Type&ft != 0 {
+ fds = append(fds, fd)
+ }
+ }
+ ms.mu.Unlock()
+ return fds, nil
+}
+
+func (ms *memStorage) Open(fd FileDesc) (Reader, error) {
+ if !FileDescOk(fd) {
+ return nil, ErrInvalidFile
+ }
+
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
+ if m, exist := ms.files[packFile(fd)]; exist {
if m.open {
return nil, errFileOpen
}
m.open = true
- return &memReader{Reader: bytes.NewReader(m.Bytes()), m: m}, nil
+ return &memReader{Reader: bytes.NewReader(m.Bytes()), ms: ms, m: m}, nil
}
return nil, os.ErrNotExist
}
-func (p *memFilePtr) Create() (Writer, error) {
- ms := p.ms
+func (ms *memStorage) Create(fd FileDesc) (Writer, error) {
+ if !FileDescOk(fd) {
+ return nil, ErrInvalidFile
+ }
+
+ x := packFile(fd)
ms.mu.Lock()
defer ms.mu.Unlock()
- m, exist := ms.files[p.x()]
+ m, exist := ms.files[x]
if exist {
if m.open {
return nil, errFileOpen
}
m.Reset()
} else {
- m = &memFile{ms: ms}
- ms.files[p.x()] = m
+ m = &memFile{}
+ ms.files[x] = m
}
m.open = true
- return m, nil
+ return &memWriter{memFile: m, ms: ms}, nil
}
-func (p *memFilePtr) Replace(newfile File) error {
- p1, ok := newfile.(*memFilePtr)
- if !ok {
+func (ms *memStorage) Remove(fd FileDesc) error {
+ if !FileDescOk(fd) {
return ErrInvalidFile
}
- ms := p.ms
+
+ x := packFile(fd)
ms.mu.Lock()
defer ms.mu.Unlock()
- m1, exist := ms.files[p1.x()]
- if !exist {
- return os.ErrNotExist
- }
- m0, exist := ms.files[p.x()]
- if (exist && m0.open) || m1.open {
- return errFileOpen
- }
- delete(ms.files, p1.x())
- ms.files[p.x()] = m1
- return nil
-}
-
-func (p *memFilePtr) Type() FileType {
- return p.t
-}
-
-func (p *memFilePtr) Num() uint64 {
- return p.num
-}
-
-func (p *memFilePtr) Remove() error {
- ms := p.ms
- ms.mu.Lock()
- defer ms.mu.Unlock()
- if _, exist := ms.files[p.x()]; exist {
- delete(ms.files, p.x())
+ if _, exist := ms.files[x]; exist {
+ delete(ms.files, x)
return nil
}
return os.ErrNotExist
}
+
+func (ms *memStorage) Rename(oldfd, newfd FileDesc) error {
+ if FileDescOk(oldfd) || FileDescOk(newfd) {
+ return ErrInvalidFile
+ }
+ if oldfd == newfd {
+ return nil
+ }
+
+ oldx := packFile(oldfd)
+ newx := packFile(newfd)
+ ms.mu.Lock()
+ defer ms.mu.Unlock()
+ oldm, exist := ms.files[oldx]
+ if !exist {
+ return os.ErrNotExist
+ }
+ newm, exist := ms.files[newx]
+ if (exist && newm.open) || oldm.open {
+ return errFileOpen
+ }
+ delete(ms.files, oldx)
+ ms.files[newx] = oldm
+ return nil
+}
+
+func (*memStorage) Close() error { return nil }
+
+type memFile struct {
+ bytes.Buffer
+ open bool
+}
+
+type memReader struct {
+ *bytes.Reader
+ ms *memStorage
+ m *memFile
+ closed bool
+}
+
+func (mr *memReader) Close() error {
+ mr.ms.mu.Lock()
+ defer mr.ms.mu.Unlock()
+ if mr.closed {
+ return ErrClosed
+ }
+ mr.m.open = false
+ return nil
+}
+
+type memWriter struct {
+ *memFile
+ ms *memStorage
+ closed bool
+}
+
+func (*memWriter) Sync() error { return nil }
+
+func (mw *memWriter) Close() error {
+ mw.ms.mu.Lock()
+ defer mw.ms.mu.Unlock()
+ if mw.closed {
+ return ErrClosed
+ }
+ mw.memFile.open = false
+ return nil
+}
+
+func packFile(fd FileDesc) uint64 {
+ return uint64(fd.Num)<<typeShift | uint64(fd.Type)
+}
+
+func unpackFile(x uint64) FileDesc {
+ return FileDesc{FileType(x) & TypeAll, int64(x >> typeShift)}
+}
diff --git a/leveldb/storage/mem_storage_test.go b/leveldb/storage/mem_storage_test.go
index 23bb074..7295075 100644
--- a/leveldb/storage/mem_storage_test.go
+++ b/leveldb/storage/mem_storage_test.go
@@ -30,18 +30,17 @@
t.Fatal("storage lock failed(2): ", err)
}
- f := m.GetFile(1, TypeTable)
- if f.Num() != 1 && f.Type() != TypeTable {
- t.Fatal("invalid file number and type")
+ w, err := m.Create(FileDesc{TypeTable, 1})
+ if err != nil {
+ t.Fatal("Storage.Create: ", err)
}
- w, _ := f.Create()
w.Write([]byte("abc"))
w.Close()
- if ff, _ := m.GetFiles(TypeAll); len(ff) != 1 {
+ if fds, _ := m.List(TypeAll); len(fds) != 1 {
t.Fatal("invalid GetFiles len")
}
buf := new(bytes.Buffer)
- r, err := f.Open()
+ r, err := m.Open(FileDesc{TypeTable, 1})
if err != nil {
t.Fatal("Open: got error: ", err)
}
@@ -50,17 +49,17 @@
if got := buf.String(); got != "abc" {
t.Fatalf("Read: invalid value, want=abc got=%s", got)
}
- if _, err := f.Open(); err != nil {
+ if _, err := m.Open(FileDesc{TypeTable, 1}); err != nil {
t.Fatal("Open: got error: ", err)
}
- if _, err := m.GetFile(1, TypeTable).Open(); err == nil {
+ if _, err := m.Open(FileDesc{TypeTable, 1}); err == nil {
t.Fatal("expecting error")
}
- f.Remove()
- if ff, _ := m.GetFiles(TypeAll); len(ff) != 0 {
- t.Fatal("invalid GetFiles len", len(ff))
+ m.Remove(FileDesc{TypeTable, 1})
+ if fds, _ := m.List(TypeAll); len(fds) != 0 {
+ t.Fatal("invalid GetFiles len", len(fds))
}
- if _, err := f.Open(); err == nil {
+ if _, err := m.Open(FileDesc{TypeTable, 1}); err == nil {
t.Fatal("expecting error")
}
}
diff --git a/leveldb/storage/storage.go b/leveldb/storage/storage.go
index a4e037c..9b30b67 100644
--- a/leveldb/storage/storage.go
+++ b/leveldb/storage/storage.go
@@ -15,7 +15,7 @@
"github.com/syndtr/goleveldb/leveldb/util"
)
-type FileType uint32
+type FileType int
const (
TypeManifest FileType = 1 << iota
@@ -50,13 +50,13 @@
// a file. Package storage has its own type instead of using
// errors.ErrCorrupted to prevent circular import.
type ErrCorrupted struct {
- File *FileInfo
- Err error
+ Fd FileDesc
+ Err error
}
func (e *ErrCorrupted) Error() string {
- if e.File != nil {
- return fmt.Sprintf("%v [file=%v]", e.Err, e.File)
+ if !e.Fd.Nil() {
+ return fmt.Sprintf("%v [file=%v]", e.Err, e.Fd)
} else {
return e.Err.Error()
}
@@ -83,31 +83,47 @@
Syncer
}
-// File is the file. A file instance must be goroutine-safe.
-type File interface {
- // Open opens the file for read. Returns os.ErrNotExist error
- // if the file does not exist.
- // Returns ErrClosed if the underlying storage is closed.
- Open() (r Reader, err error)
+type Lock interface {
+ util.Releaser
+}
- // Create creates the file for writting. Truncate the file if
- // already exist.
- // Returns ErrClosed if the underlying storage is closed.
- Create() (w Writer, err error)
+// FileDesc is a file descriptor.
+type FileDesc struct {
+ Type FileType
+ Num int64
+}
- // Replace replaces file with newfile.
- // Returns ErrClosed if the underlying storage is closed.
- Replace(newfile File) error
+func (fd FileDesc) String() string {
+ switch fd.Type {
+ case TypeManifest:
+ return fmt.Sprintf("MANIFEST-%06d", fd.Num)
+ case TypeJournal:
+ return fmt.Sprintf("%06d.log", fd.Num)
+ case TypeTable:
+ return fmt.Sprintf("%06d.ldb", fd.Num)
+ case TypeTemp:
+ return fmt.Sprintf("%06d.tmp", fd.Num)
+ default:
+ return fmt.Sprintf("%#x-%d", fd.Type, fd.Num)
+ }
+}
- // Type returns the file type
- Type() FileType
+// Nil returns true if fd == (FileDesc{}).
+func (fd FileDesc) Nil() bool {
+ return fd == (FileDesc{})
+}
- // Num returns the file number.
- Num() uint64
-
- // Remove removes the file.
- // Returns ErrClosed if the underlying storage is closed.
- Remove() error
+// FileDescOk returns true if fd is a valid file descriptor.
+func FileDescOk(fd FileDesc) bool {
+ switch fd.Type {
+ case TypeManifest:
+ case TypeJournal:
+ case TypeTable:
+ case TypeTemp:
+ default:
+ return false
+ }
+ return fd.Num >= 0
}
// Storage is the storage. A storage instance must be goroutine-safe.
@@ -115,59 +131,47 @@
// Lock locks the storage. Any subsequent attempt to call Lock will fail
// until the last lock released.
// After use the caller should call the Release method.
- Lock() (l util.Releaser, err error)
+ Lock() (Lock, error)
- // Log logs a string. This is used for logging. An implementation
- // may write to a file, stdout or simply do nothing.
+ // Log logs a string. This is used for logging.
+ // An implementation may write to a file, stdout or simply do nothing.
Log(str string)
- // GetFile returns a file for the given number and type. GetFile will never
- // returns nil, even if the underlying storage is closed.
- GetFile(num uint64, t FileType) File
+ // SetMeta sets to point to the given fd, which then can be acquired using
+ // GetMeta method.
+ // SetMeta should be implemented in such way that changes should happened
+ // atomically.
+ SetMeta(fd FileDesc) error
- // GetFiles returns a slice of files that match the given file types.
+ // GetManifest returns a manifest file.
+ // Returns os.ErrNotExist if meta doesn't point to any fd, or point to fd
+ // that doesn't exist.
+ GetMeta() (FileDesc, error)
+
+ // List returns fds that match the given file types.
// The file types may be OR'ed together.
- GetFiles(t FileType) ([]File, error)
+ List(ft FileType) ([]FileDesc, error)
- // GetManifest returns a manifest file. Returns os.ErrNotExist if manifest
- // file does not exist.
- GetManifest() (File, error)
+ // Open opens file with the given fd read-only.
+ // Returns os.ErrNotExist error if the file does not exist.
+ // Returns ErrClosed if the underlying storage is closed.
+ Open(fd FileDesc) (Reader, error)
- // SetManifest sets the given file as manifest file. The given file should
- // be a manifest file type or error will be returned.
- SetManifest(f File) error
+ // Create creates file with the given fd, truncate if already exist and
+ // opens write-only.
+ // Returns ErrClosed if the underlying storage is closed.
+ Create(fd FileDesc) (Writer, error)
- // Close closes the storage. It is valid to call Close multiple times.
- // Other methods should not be called after the storage has been closed.
+ // Remove removes file with the given fd.
+ // Returns ErrClosed if the underlying storage is closed.
+ Remove(fd FileDesc) error
+
+ // Rename renames file from oldfd to newfd.
+ // Returns ErrClosed if the underlying storage is closed.
+ Rename(oldfd, newfd FileDesc) error
+
+ // Close closes the storage.
+ // It is valid to call Close multiple times. Other methods should not be
+ // called after the storage has been closed.
Close() error
}
-
-// FileInfo wraps basic file info.
-type FileInfo struct {
- Type FileType
- Num uint64
-}
-
-func (fi FileInfo) String() string {
- switch fi.Type {
- case TypeManifest:
- return fmt.Sprintf("MANIFEST-%06d", fi.Num)
- case TypeJournal:
- return fmt.Sprintf("%06d.log", fi.Num)
- case TypeTable:
- return fmt.Sprintf("%06d.ldb", fi.Num)
- case TypeTemp:
- return fmt.Sprintf("%06d.tmp", fi.Num)
- default:
- return fmt.Sprintf("%#x-%d", fi.Type, fi.Num)
- }
-}
-
-// NewFileInfo creates new FileInfo from the given File. It will returns nil
-// if File is nil.
-func NewFileInfo(f File) *FileInfo {
- if f == nil {
- return nil
- }
- return &FileInfo{f.Type(), f.Num()}
-}
diff --git a/leveldb/storage_test.go b/leveldb/storage_test.go
deleted file mode 100644
index 08be0ba..0000000
--- a/leveldb/storage_test.go
+++ /dev/null
@@ -1,549 +0,0 @@
-// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
-// All rights reserved.
-//
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENE file.
-
-package leveldb
-
-import (
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "math/rand"
- "os"
- "path/filepath"
- "sync"
- "testing"
-
- "github.com/syndtr/goleveldb/leveldb/storage"
- "github.com/syndtr/goleveldb/leveldb/util"
-)
-
-const typeShift = 4
-
-var (
- tsErrInvalidFile = errors.New("leveldb.testStorage: invalid file for argument")
- tsErrFileOpen = errors.New("leveldb.testStorage: file still open")
-)
-
-var (
- tsFSEnv = os.Getenv("GOLEVELDB_USEFS")
- tsTempdir = os.Getenv("GOLEVELDB_TEMPDIR")
- tsKeepFS = tsFSEnv == "2"
- tsFS = tsKeepFS || tsFSEnv == "" || tsFSEnv == "1"
- tsMU = &sync.Mutex{}
- tsNum = 0
-)
-
-type tsOp uint
-
-const (
- tsOpOpen tsOp = iota
- tsOpCreate
- tsOpReplace
- tsOpRemove
- tsOpRead
- tsOpReadAt
- tsOpWrite
- tsOpSync
-
- tsOpNum
-)
-
-type tsLock struct {
- ts *testStorage
- r util.Releaser
-}
-
-func (l tsLock) Release() {
- l.r.Release()
- l.ts.t.Log("I: storage lock released")
-}
-
-type tsReader struct {
- tf tsFile
- storage.Reader
-}
-
-func (tr tsReader) Read(b []byte) (n int, err error) {
- ts := tr.tf.ts
- ts.countRead(tr.tf.Type())
- if tr.tf.shouldErrLocked(tsOpRead) {
- return 0, errors.New("leveldb.testStorage: emulated read error")
- }
- n, err = tr.Reader.Read(b)
- if err != nil && err != io.EOF {
- ts.t.Errorf("E: read error, num=%d type=%v n=%d: %v", tr.tf.Num(), tr.tf.Type(), n, err)
- }
- return
-}
-
-func (tr tsReader) ReadAt(b []byte, off int64) (n int, err error) {
- ts := tr.tf.ts
- ts.countRead(tr.tf.Type())
- if tr.tf.shouldErrLocked(tsOpReadAt) {
- return 0, errors.New("leveldb.testStorage: emulated readAt error")
- }
- n, err = tr.Reader.ReadAt(b, off)
- if err != nil && err != io.EOF {
- ts.t.Errorf("E: readAt error, num=%d type=%v off=%d n=%d: %v", tr.tf.Num(), tr.tf.Type(), off, n, err)
- }
- return
-}
-
-func (tr tsReader) Close() (err error) {
- err = tr.Reader.Close()
- tr.tf.close("reader", err)
- return
-}
-
-type tsWriter struct {
- tf tsFile
- storage.Writer
-}
-
-func (tw tsWriter) Write(b []byte) (n int, err error) {
- if tw.tf.shouldErrLocked(tsOpWrite) {
- return 0, errors.New("leveldb.testStorage: emulated write error")
- }
- n, err = tw.Writer.Write(b)
- if err != nil {
- tw.tf.ts.t.Errorf("E: write error, num=%d type=%v n=%d: %v", tw.tf.Num(), tw.tf.Type(), n, err)
- }
- return
-}
-
-func (tw tsWriter) Sync() (err error) {
- ts := tw.tf.ts
- ts.mu.Lock()
- for ts.emuDelaySync&tw.tf.Type() != 0 {
- ts.cond.Wait()
- }
- ts.mu.Unlock()
- if tw.tf.shouldErrLocked(tsOpSync) {
- return errors.New("leveldb.testStorage: emulated sync error")
- }
- err = tw.Writer.Sync()
- if err != nil {
- tw.tf.ts.t.Errorf("E: sync error, num=%d type=%v: %v", tw.tf.Num(), tw.tf.Type(), err)
- }
- return
-}
-
-func (tw tsWriter) Close() (err error) {
- err = tw.Writer.Close()
- tw.tf.close("writer", err)
- return
-}
-
-type tsFile struct {
- ts *testStorage
- storage.File
-}
-
-func (tf tsFile) x() uint64 {
- return tf.Num()<<typeShift | uint64(tf.Type())
-}
-
-func (tf tsFile) shouldErr(op tsOp) bool {
- return tf.ts.shouldErr(tf, op)
-}
-
-func (tf tsFile) shouldErrLocked(op tsOp) bool {
- tf.ts.mu.Lock()
- defer tf.ts.mu.Unlock()
- return tf.shouldErr(op)
-}
-
-func (tf tsFile) checkOpen(m string) error {
- ts := tf.ts
- if writer, ok := ts.opens[tf.x()]; ok {
- if writer {
- ts.t.Errorf("E: cannot %s file, num=%d type=%v: a writer still open", m, tf.Num(), tf.Type())
- } else {
- ts.t.Errorf("E: cannot %s file, num=%d type=%v: a reader still open", m, tf.Num(), tf.Type())
- }
- return tsErrFileOpen
- }
- return nil
-}
-
-func (tf tsFile) close(m string, err error) {
- ts := tf.ts
- ts.mu.Lock()
- defer ts.mu.Unlock()
- if _, ok := ts.opens[tf.x()]; !ok {
- ts.t.Errorf("E: %s: redudant file closing, num=%d type=%v", m, tf.Num(), tf.Type())
- } else if err == nil {
- ts.t.Logf("I: %s: file closed, num=%d type=%v", m, tf.Num(), tf.Type())
- }
- delete(ts.opens, tf.x())
- if err != nil {
- ts.t.Errorf("E: %s: cannot close file, num=%d type=%v: %v", m, tf.Num(), tf.Type(), err)
- }
-}
-
-func (tf tsFile) Open() (r storage.Reader, err error) {
- ts := tf.ts
- ts.mu.Lock()
- defer ts.mu.Unlock()
- err = tf.checkOpen("open")
- if err != nil {
- return
- }
- if tf.shouldErr(tsOpOpen) {
- err = errors.New("leveldb.testStorage: emulated open error")
- return
- }
- r, err = tf.File.Open()
- if err != nil {
- if ts.ignoreOpenErr&tf.Type() != 0 {
- ts.t.Logf("I: cannot open file, num=%d type=%v: %v (ignored)", tf.Num(), tf.Type(), err)
- } else {
- ts.t.Errorf("E: cannot open file, num=%d type=%v: %v", tf.Num(), tf.Type(), err)
- }
- } else {
- ts.t.Logf("I: file opened, num=%d type=%v", tf.Num(), tf.Type())
- ts.opens[tf.x()] = false
- r = tsReader{tf, r}
- }
- return
-}
-
-func (tf tsFile) Create() (w storage.Writer, err error) {
- ts := tf.ts
- ts.mu.Lock()
- defer ts.mu.Unlock()
- err = tf.checkOpen("create")
- if err != nil {
- return
- }
- if tf.shouldErr(tsOpCreate) {
- err = errors.New("leveldb.testStorage: emulated create error")
- return
- }
- w, err = tf.File.Create()
- if err != nil {
- ts.t.Errorf("E: cannot create file, num=%d type=%v: %v", tf.Num(), tf.Type(), err)
- } else {
- ts.t.Logf("I: file created, num=%d type=%v", tf.Num(), tf.Type())
- ts.opens[tf.x()] = true
- w = tsWriter{tf, w}
- }
- return
-}
-
-func (tf tsFile) Replace(newfile storage.File) (err error) {
- ts := tf.ts
- ts.mu.Lock()
- defer ts.mu.Unlock()
- err = tf.checkOpen("replace")
- if err != nil {
- return
- }
- if tf.shouldErr(tsOpReplace) {
- err = errors.New("leveldb.testStorage: emulated create error")
- return
- }
- err = tf.File.Replace(newfile.(tsFile).File)
- if err != nil {
- ts.t.Errorf("E: cannot replace file, num=%d type=%v: %v", tf.Num(), tf.Type(), err)
- } else {
- ts.t.Logf("I: file replace, num=%d type=%v", tf.Num(), tf.Type())
- }
- return
-}
-
-func (tf tsFile) Remove() (err error) {
- ts := tf.ts
- ts.mu.Lock()
- defer ts.mu.Unlock()
- err = tf.checkOpen("remove")
- if err != nil {
- return
- }
- if tf.shouldErr(tsOpRemove) {
- err = errors.New("leveldb.testStorage: emulated create error")
- return
- }
- err = tf.File.Remove()
- if err != nil {
- ts.t.Errorf("E: cannot remove file, num=%d type=%v: %v", tf.Num(), tf.Type(), err)
- } else {
- ts.t.Logf("I: file removed, num=%d type=%v", tf.Num(), tf.Type())
- }
- return
-}
-
-type testStorage struct {
- t *testing.T
- storage.Storage
- closeFn func() error
-
- mu sync.Mutex
- cond sync.Cond
- // Open files, true=writer, false=reader
- opens map[uint64]bool
- emuDelaySync storage.FileType
- ignoreOpenErr storage.FileType
- readCnt uint64
- readCntEn storage.FileType
-
- emuErr [tsOpNum]storage.FileType
- emuErrOnce [tsOpNum]storage.FileType
- emuRandErr [tsOpNum]storage.FileType
- emuRandErrProb int
- emuErrOnceMap map[uint64]uint
- emuRandRand *rand.Rand
-}
-
-func (ts *testStorage) shouldErr(tf tsFile, op tsOp) bool {
- if ts.emuErr[op]&tf.Type() != 0 {
- return true
- } else if ts.emuRandErr[op]&tf.Type() != 0 || ts.emuErrOnce[op]&tf.Type() != 0 {
- sop := uint(1) << op
- eop := ts.emuErrOnceMap[tf.x()]
- if eop&sop == 0 && (ts.emuRandRand.Int()%ts.emuRandErrProb == 0 || ts.emuErrOnce[op]&tf.Type() != 0) {
- ts.emuErrOnceMap[tf.x()] = eop | sop
- ts.t.Logf("I: emulated error: file=%d type=%v op=%v", tf.Num(), tf.Type(), op)
- return true
- }
- }
- return false
-}
-
-func (ts *testStorage) SetEmuErr(t storage.FileType, ops ...tsOp) {
- ts.mu.Lock()
- for _, op := range ops {
- ts.emuErr[op] = t
- }
- ts.mu.Unlock()
-}
-
-func (ts *testStorage) SetEmuErrOnce(t storage.FileType, ops ...tsOp) {
- ts.mu.Lock()
- for _, op := range ops {
- ts.emuErrOnce[op] = t
- }
- ts.mu.Unlock()
-}
-
-func (ts *testStorage) SetEmuRandErr(t storage.FileType, ops ...tsOp) {
- ts.mu.Lock()
- for _, op := range ops {
- ts.emuRandErr[op] = t
- }
- ts.mu.Unlock()
-}
-
-func (ts *testStorage) SetEmuRandErrProb(prob int) {
- ts.mu.Lock()
- ts.emuRandErrProb = prob
- ts.mu.Unlock()
-}
-
-func (ts *testStorage) DelaySync(t storage.FileType) {
- ts.mu.Lock()
- ts.emuDelaySync |= t
- ts.cond.Broadcast()
- ts.mu.Unlock()
-}
-
-func (ts *testStorage) ReleaseSync(t storage.FileType) {
- ts.mu.Lock()
- ts.emuDelaySync &= ^t
- ts.cond.Broadcast()
- ts.mu.Unlock()
-}
-
-func (ts *testStorage) ReadCounter() uint64 {
- ts.mu.Lock()
- defer ts.mu.Unlock()
- return ts.readCnt
-}
-
-func (ts *testStorage) ResetReadCounter() {
- ts.mu.Lock()
- ts.readCnt = 0
- ts.mu.Unlock()
-}
-
-func (ts *testStorage) SetReadCounter(t storage.FileType) {
- ts.mu.Lock()
- ts.readCntEn = t
- ts.mu.Unlock()
-}
-
-func (ts *testStorage) countRead(t storage.FileType) {
- ts.mu.Lock()
- if ts.readCntEn&t != 0 {
- ts.readCnt++
- }
- ts.mu.Unlock()
-}
-
-func (ts *testStorage) SetIgnoreOpenErr(t storage.FileType) {
- ts.ignoreOpenErr = t
-}
-
-func (ts *testStorage) Lock() (r util.Releaser, err error) {
- r, err = ts.Storage.Lock()
- if err != nil {
- ts.t.Logf("W: storage locking failed: %v", err)
- } else {
- ts.t.Log("I: storage locked")
- r = tsLock{ts, r}
- }
- return
-}
-
-func (ts *testStorage) Log(str string) {
- ts.t.Log("L: " + str)
- ts.Storage.Log(str)
-}
-
-func (ts *testStorage) GetFile(num uint64, t storage.FileType) storage.File {
- return tsFile{ts, ts.Storage.GetFile(num, t)}
-}
-
-func (ts *testStorage) GetFiles(t storage.FileType) (ff []storage.File, err error) {
- ff0, err := ts.Storage.GetFiles(t)
- if err != nil {
- ts.t.Errorf("E: get files failed: %v", err)
- return
- }
- ff = make([]storage.File, len(ff0))
- for i, f := range ff0 {
- ff[i] = tsFile{ts, f}
- }
- ts.t.Logf("I: get files, type=0x%x count=%d", int(t), len(ff))
- return
-}
-
-func (ts *testStorage) GetManifest() (f storage.File, err error) {
- f0, err := ts.Storage.GetManifest()
- if err != nil {
- if !os.IsNotExist(err) {
- ts.t.Errorf("E: get manifest failed: %v", err)
- }
- return
- }
- f = tsFile{ts, f0}
- ts.t.Logf("I: get manifest, num=%d", f.Num())
- return
-}
-
-func (ts *testStorage) SetManifest(f storage.File) error {
- tf, ok := f.(tsFile)
- if !ok {
- ts.t.Error("E: set manifest failed: type assertion failed")
- return tsErrInvalidFile
- } else if tf.Type() != storage.TypeManifest {
- ts.t.Errorf("E: set manifest failed: invalid file type: %s", tf.Type())
- return tsErrInvalidFile
- }
- err := ts.Storage.SetManifest(tf.File)
- if err != nil {
- ts.t.Errorf("E: set manifest failed: %v", err)
- } else {
- ts.t.Logf("I: set manifest, num=%d", tf.Num())
- }
- return err
-}
-
-func (ts *testStorage) Close() error {
- ts.CloseCheck()
- err := ts.Storage.Close()
- if err != nil {
- ts.t.Errorf("E: closing storage failed: %v", err)
- } else {
- ts.t.Log("I: storage closed")
- }
- if ts.closeFn != nil {
- if err := ts.closeFn(); err != nil {
- ts.t.Errorf("E: close function: %v", err)
- }
- }
- return err
-}
-
-func (ts *testStorage) CloseCheck() {
- ts.mu.Lock()
- if len(ts.opens) == 0 {
- ts.t.Log("I: all files are closed")
- } else {
- ts.t.Errorf("E: %d files still open", len(ts.opens))
- for x, writer := range ts.opens {
- num, tt := x>>typeShift, storage.FileType(x)&storage.TypeAll
- ts.t.Errorf("E: * num=%d type=%v writer=%v", num, tt, writer)
- }
- }
- ts.mu.Unlock()
-}
-
-func newTestStorage(t *testing.T) *testStorage {
- var stor storage.Storage
- var closeFn func() error
- if tsFS {
- for {
- tsMU.Lock()
- num := tsNum
- tsNum++
- tsMU.Unlock()
- tempdir := tsTempdir
- if tempdir == "" {
- tempdir = os.TempDir()
- }
- path := filepath.Join(tempdir, fmt.Sprintf("goleveldb-test%d0%d0%d", os.Getuid(), os.Getpid(), num))
- if _, err := os.Stat(path); err != nil {
- stor, err = storage.OpenFile(path)
- if err != nil {
- t.Fatalf("F: cannot create storage: %v", err)
- }
- t.Logf("I: storage created: %s", path)
- closeFn = func() error {
- for _, name := range []string{"LOG.old", "LOG"} {
- f, err := os.Open(filepath.Join(path, name))
- if err != nil {
- continue
- }
- if log, err := ioutil.ReadAll(f); err != nil {
- t.Logf("---------------------- %s ----------------------", name)
- t.Logf("cannot read log: %v", err)
- t.Logf("---------------------- %s ----------------------", name)
- } else if len(log) > 0 {
- t.Logf("---------------------- %s ----------------------\n%s", name, string(log))
- t.Logf("---------------------- %s ----------------------", name)
- }
- f.Close()
- }
- if t.Failed() {
- t.Logf("testing failed, test DB preserved at %s", path)
- return nil
- }
- if tsKeepFS {
- return nil
- }
- return os.RemoveAll(path)
- }
-
- break
- }
- }
- } else {
- stor = storage.NewMemStorage()
- }
- ts := &testStorage{
- t: t,
- Storage: stor,
- closeFn: closeFn,
- opens: make(map[uint64]bool),
- emuErrOnceMap: make(map[uint64]uint),
- emuRandErrProb: 0x999,
- emuRandRand: rand.New(rand.NewSource(0xfacedead)),
- }
- ts.cond.L = &ts.mu
- return ts
-}
diff --git a/leveldb/table.go b/leveldb/table.go
index 37be47a..40ca132 100644
--- a/leveldb/table.go
+++ b/leveldb/table.go
@@ -21,9 +21,9 @@
// tFile holds basic information about a table.
type tFile struct {
- file storage.File
+ fd storage.FileDesc
seekLeft int32
- size uint64
+ size int64
imin, imax iKey
}
@@ -48,9 +48,9 @@
}
// Creates new tFile.
-func newTableFile(file storage.File, size uint64, imin, imax iKey) *tFile {
+func newTableFile(fd storage.FileDesc, size int64, imin, imax iKey) *tFile {
f := &tFile{
- file: file,
+ fd: fd,
size: size,
imin: imin,
imax: imax,
@@ -77,6 +77,10 @@
return f
}
+func tableFileFromRecord(r atRecord) *tFile {
+ return newTableFile(storage.FileDesc{storage.TypeTable, r.num}, r.size, r.imin, r.imax)
+}
+
// tFiles hold multiple tFile.
type tFiles []*tFile
@@ -89,7 +93,7 @@
if i != 0 {
x += ", "
}
- x += fmt.Sprint(f.file.Num())
+ x += fmt.Sprint(f.fd.Num)
}
x += " ]"
return x
@@ -101,7 +105,7 @@
a, b := tf[i], tf[j]
n := icmp.Compare(a.imin, b.imin)
if n == 0 {
- return a.file.Num() < b.file.Num()
+ return a.fd.Num < b.fd.Num
}
return n < 0
}
@@ -109,7 +113,7 @@
// Returns true if i file number is greater than j.
// This used for sort by file number in descending order.
func (tf tFiles) lessByNum(i, j int) bool {
- return tf[i].file.Num() > tf[j].file.Num()
+ return tf[i].fd.Num > tf[j].fd.Num
}
// Sorts tables by key in ascending order.
@@ -123,7 +127,7 @@
}
// Returns sum of all tables size.
-func (tf tFiles) size() (sum uint64) {
+func (tf tFiles) size() (sum int64) {
for _, t := range tf {
sum += t.size
}
@@ -295,16 +299,16 @@
// Creates an empty table and returns table writer.
func (t *tOps) create() (*tWriter, error) {
- file := t.s.getTableFile(t.s.allocFileNum())
- fw, err := file.Create()
+ fd := storage.FileDesc{storage.TypeTable, t.s.allocFileNum()}
+ fw, err := t.s.stor.Create(fd)
if err != nil {
return nil, err
}
return &tWriter{
- t: t,
- file: file,
- w: fw,
- tw: table.NewWriter(fw, t.s.o.Options),
+ t: t,
+ fd: fd,
+ w: fw,
+ tw: table.NewWriter(fw, t.s.o.Options),
}, nil
}
@@ -340,21 +344,20 @@
// Opens table. It returns a cache handle, which should
// be released after use.
func (t *tOps) open(f *tFile) (ch *cache.Handle, err error) {
- num := f.file.Num()
- ch = t.cache.Get(0, num, func() (size int, value cache.Value) {
+ ch = t.cache.Get(0, uint64(f.fd.Num), func() (size int, value cache.Value) {
var r storage.Reader
- r, err = f.file.Open()
+ r, err = t.s.stor.Open(f.fd)
if err != nil {
return 0, nil
}
var bcache *cache.CacheGetter
if t.bcache != nil {
- bcache = &cache.CacheGetter{Cache: t.bcache, NS: num}
+ bcache = &cache.CacheGetter{Cache: t.bcache, NS: uint64(f.fd.Num)}
}
var tr *table.Reader
- tr, err = table.NewReader(r, int64(f.size), storage.NewFileInfo(f.file), bcache, t.bpool, t.s.o.Options)
+ tr, err = table.NewReader(r, f.size, f.fd, bcache, t.bpool, t.s.o.Options)
if err != nil {
r.Close()
return 0, nil
@@ -414,15 +417,14 @@
// Removes table from persistent storage. It waits until
// no one use the the table.
func (t *tOps) remove(f *tFile) {
- num := f.file.Num()
- t.cache.Delete(0, num, func() {
- if err := f.file.Remove(); err != nil {
- t.s.logf("table@remove removing @%d %q", num, err)
+ t.cache.Delete(0, uint64(f.fd.Num), func() {
+ if err := t.s.stor.Remove(f.fd); err != nil {
+ t.s.logf("table@remove removing @%d %q", f.fd.Num, err)
} else {
- t.s.logf("table@remove removed @%d", num)
+ t.s.logf("table@remove removed @%d", f.fd.Num)
}
if t.bcache != nil {
- t.bcache.EvictNS(num)
+ t.bcache.EvictNS(uint64(f.fd.Num))
}
})
}
@@ -471,9 +473,9 @@
type tWriter struct {
t *tOps
- file storage.File
- w storage.Writer
- tw *table.Writer
+ fd storage.FileDesc
+ w storage.Writer
+ tw *table.Writer
first, last []byte
}
@@ -513,16 +515,15 @@
return
}
}
- f = newTableFile(w.file, uint64(w.tw.BytesLen()), iKey(w.first), iKey(w.last))
+ f = newTableFile(w.fd, int64(w.tw.BytesLen()), iKey(w.first), iKey(w.last))
return
}
// Drops the table.
func (w *tWriter) drop() {
w.close()
- w.file.Remove()
- w.t.s.reuseFileNum(w.file.Num())
- w.file = nil
+ w.t.s.stor.Remove(w.fd)
+ w.t.s.reuseFileNum(w.fd.Num)
w.tw = nil
w.first = nil
w.last = nil
diff --git a/leveldb/table/reader.go b/leveldb/table/reader.go
index 23c7c61..caeac96 100644
--- a/leveldb/table/reader.go
+++ b/leveldb/table/reader.go
@@ -507,7 +507,7 @@
// Reader is a table reader.
type Reader struct {
mu sync.RWMutex
- fi *storage.FileInfo
+ fd storage.FileDesc
reader io.ReaderAt
cache *cache.CacheGetter
err error
@@ -539,7 +539,7 @@
}
func (r *Reader) newErrCorrupted(pos, size int64, kind, reason string) error {
- return &errors.ErrCorrupted{File: r.fi, Err: &ErrCorrupted{Pos: pos, Size: size, Kind: kind, Reason: reason}}
+ return &errors.ErrCorrupted{Fd: r.fd, Err: &ErrCorrupted{Pos: pos, Size: size, Kind: kind, Reason: reason}}
}
func (r *Reader) newErrCorruptedBH(bh blockHandle, reason string) error {
@@ -551,7 +551,7 @@
cerr.Pos = int64(bh.offset)
cerr.Size = int64(bh.length)
cerr.Kind = r.blockKind(bh)
- return &errors.ErrCorrupted{File: r.fi, Err: cerr}
+ return &errors.ErrCorrupted{Fd: r.fd, Err: cerr}
}
return err
}
@@ -988,13 +988,13 @@
// The fi, cache and bpool is optional and can be nil.
//
// The returned table reader instance is goroutine-safe.
-func NewReader(f io.ReaderAt, size int64, fi *storage.FileInfo, cache *cache.CacheGetter, bpool *util.BufferPool, o *opt.Options) (*Reader, error) {
+func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.CacheGetter, bpool *util.BufferPool, o *opt.Options) (*Reader, error) {
if f == nil {
return nil, errors.New("leveldb/table: nil file")
}
r := &Reader{
- fi: fi,
+ fd: fd,
reader: f,
cache: cache,
bpool: bpool,
diff --git a/leveldb/table/table_test.go b/leveldb/table/table_test.go
index 4b59b31..1bc73ed 100644
--- a/leveldb/table/table_test.go
+++ b/leveldb/table/table_test.go
@@ -14,6 +14,7 @@
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
+ "github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/testutil"
"github.com/syndtr/goleveldb/leveldb/util"
)
@@ -59,7 +60,7 @@
It("Should be able to approximate offset of a key correctly", func() {
Expect(err).ShouldNot(HaveOccurred())
- tr, err := NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()), nil, nil, nil, o)
+ tr, err := NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()), storage.FileDesc{}, nil, nil, o)
Expect(err).ShouldNot(HaveOccurred())
CheckOffset := func(key string, expect, threshold int) {
offset, err := tr.OffsetOf([]byte(key))
@@ -96,7 +97,7 @@
tw.Close()
// Opening the table.
- tr, _ := NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()), nil, nil, nil, o)
+ tr, _ := NewReader(bytes.NewReader(buf.Bytes()), int64(buf.Len()), storage.FileDesc{}, nil, nil, o)
return tableWrapper{tr}
}
Test := func(kv *testutil.KeyValue, body func(r *Reader)) func() {
diff --git a/leveldb/testutil/storage.go b/leveldb/testutil/storage.go
index 59c496d..ad42152 100644
--- a/leveldb/testutil/storage.go
+++ b/leveldb/testutil/storage.go
@@ -10,6 +10,7 @@
"bytes"
"fmt"
"io"
+ "math/rand"
"os"
"path/filepath"
"runtime"
@@ -35,6 +36,7 @@
ModeOpen StorageMode = 1 << iota
ModeCreate
ModeRemove
+ ModeRename
ModeRead
ModeWrite
ModeSync
@@ -45,6 +47,7 @@
modeOpen = iota
modeCreate
modeRemove
+ modeRename
modeRead
modeWrite
modeSync
@@ -73,6 +76,8 @@
x = modeCreate
case ModeRemove:
x = modeRemove
+ case ModeRename:
+ x = modeRename
case ModeRead:
x = modeRead
case ModeWrite:
@@ -121,6 +126,8 @@
add(modeCreate)
case m&ModeRemove != 0:
add(modeRemove)
+ case m&ModeRename != 0:
+ add(modeRename)
case m&ModeRead != 0:
add(modeRead)
case m&ModeWrite != 0:
@@ -133,15 +140,15 @@
return ret
}
-func packFile(num uint64, t storage.FileType) uint64 {
- if num>>(64-typeCount) != 0 {
+func packFile(fd storage.FileDesc) uint64 {
+ if fd.Num>>(63-typeCount) != 0 {
panic("overflow")
}
- return num<<typeCount | uint64(t)
+ return uint64(fd.Num<<typeCount) | uint64(fd.Type)
}
-func unpackFile(x uint64) (uint64, storage.FileType) {
- return x >> typeCount, storage.FileType(x) & storage.TypeAll
+func unpackFile(x uint64) storage.FileDesc {
+ return storage.FileDesc{storage.FileType(x) & storage.TypeAll, int64(x >> typeCount)}
}
type emulatedError struct {
@@ -163,189 +170,98 @@
}
type reader struct {
- f *file
+ s *Storage
+ fd storage.FileDesc
storage.Reader
}
func (r *reader) Read(p []byte) (n int, err error) {
- err = r.f.s.emulateError(ModeRead, r.f.Type())
+ err = r.s.emulateError(ModeRead, r.fd.Type)
if err == nil {
- r.f.s.stall(ModeRead, r.f.Type())
+ r.s.stall(ModeRead, r.fd.Type)
n, err = r.Reader.Read(p)
}
- r.f.s.count(ModeRead, r.f.Type(), n)
+ r.s.count(ModeRead, r.fd.Type, n)
if err != nil && err != io.EOF {
- r.f.s.logI("read error, num=%d type=%v n=%d err=%v", r.f.Num(), r.f.Type(), n, err)
+ r.s.logI("read error, fd=%s n=%d err=%v", r.fd, n, err)
}
return
}
func (r *reader) ReadAt(p []byte, off int64) (n int, err error) {
- err = r.f.s.emulateError(ModeRead, r.f.Type())
+ err = r.s.emulateError(ModeRead, r.fd.Type)
if err == nil {
- r.f.s.stall(ModeRead, r.f.Type())
+ r.s.stall(ModeRead, r.fd.Type)
n, err = r.Reader.ReadAt(p, off)
}
- r.f.s.count(ModeRead, r.f.Type(), n)
+ r.s.count(ModeRead, r.fd.Type, n)
if err != nil && err != io.EOF {
- r.f.s.logI("readAt error, num=%d type=%v offset=%d n=%d err=%v", r.f.Num(), r.f.Type(), off, n, err)
+ r.s.logI("readAt error, fd=%s offset=%d n=%d err=%v", r.fd, off, n, err)
}
return
}
func (r *reader) Close() (err error) {
- return r.f.doClose(r.Reader)
+ return r.s.fileClose(r.fd, r.Reader)
}
type writer struct {
- f *file
+ s *Storage
+ fd storage.FileDesc
storage.Writer
}
func (w *writer) Write(p []byte) (n int, err error) {
- err = w.f.s.emulateError(ModeWrite, w.f.Type())
+ err = w.s.emulateError(ModeWrite, w.fd.Type)
if err == nil {
- w.f.s.stall(ModeWrite, w.f.Type())
+ w.s.stall(ModeWrite, w.fd.Type)
n, err = w.Writer.Write(p)
}
- w.f.s.count(ModeWrite, w.f.Type(), n)
+ w.s.count(ModeWrite, w.fd.Type, n)
if err != nil && err != io.EOF {
- w.f.s.logI("write error, num=%d type=%v n=%d err=%v", w.f.Num(), w.f.Type(), n, err)
+ w.s.logI("write error, fd=%s n=%d err=%v", w.fd, n, err)
}
return
}
func (w *writer) Sync() (err error) {
- err = w.f.s.emulateError(ModeSync, w.f.Type())
+ err = w.s.emulateError(ModeSync, w.fd.Type)
if err == nil {
- w.f.s.stall(ModeSync, w.f.Type())
+ w.s.stall(ModeSync, w.fd.Type)
err = w.Writer.Sync()
}
- w.f.s.count(ModeSync, w.f.Type(), 0)
+ w.s.count(ModeSync, w.fd.Type, 0)
if err != nil {
- w.f.s.logI("sync error, num=%d type=%v err=%v", w.f.Num(), w.f.Type(), err)
+ w.s.logI("sync error, fd=%s err=%v", w.fd, err)
}
return
}
func (w *writer) Close() (err error) {
- return w.f.doClose(w.Writer)
-}
-
-type file struct {
- s *Storage
- storage.File
-}
-
-func (f *file) pack() uint64 {
- return packFile(f.Num(), f.Type())
-}
-
-func (f *file) assertOpen() {
- ExpectWithOffset(2, f.s.opens).NotTo(HaveKey(f.pack()), "File open, num=%d type=%v writer=%v", f.Num(), f.Type(), f.s.opens[f.pack()])
-}
-
-func (f *file) doClose(closer io.Closer) (err error) {
- err = f.s.emulateError(ModeClose, f.Type())
- if err == nil {
- f.s.stall(ModeClose, f.Type())
- }
- f.s.mu.Lock()
- defer f.s.mu.Unlock()
- if err == nil {
- ExpectWithOffset(2, f.s.opens).To(HaveKey(f.pack()), "File closed, num=%d type=%v", f.Num(), f.Type())
- err = closer.Close()
- }
- f.s.countNB(ModeClose, f.Type(), 0)
- writer := f.s.opens[f.pack()]
- if err != nil {
- f.s.logISkip(1, "file close failed, num=%d type=%v writer=%v err=%v", f.Num(), f.Type(), writer, err)
- } else {
- f.s.logISkip(1, "file closed, num=%d type=%v writer=%v", f.Num(), f.Type(), writer)
- delete(f.s.opens, f.pack())
- }
- return
-}
-
-func (f *file) Open() (r storage.Reader, err error) {
- err = f.s.emulateError(ModeOpen, f.Type())
- if err == nil {
- f.s.stall(ModeOpen, f.Type())
- }
- f.s.mu.Lock()
- defer f.s.mu.Unlock()
- if err == nil {
- f.assertOpen()
- f.s.countNB(ModeOpen, f.Type(), 0)
- r, err = f.File.Open()
- }
- if err != nil {
- f.s.logI("file open failed, num=%d type=%v err=%v", f.Num(), f.Type(), err)
- } else {
- f.s.logI("file opened, num=%d type=%v", f.Num(), f.Type())
- f.s.opens[f.pack()] = false
- r = &reader{f, r}
- }
- return
-}
-
-func (f *file) Create() (w storage.Writer, err error) {
- err = f.s.emulateError(ModeCreate, f.Type())
- if err == nil {
- f.s.stall(ModeCreate, f.Type())
- }
- f.s.mu.Lock()
- defer f.s.mu.Unlock()
- if err == nil {
- f.assertOpen()
- f.s.countNB(ModeCreate, f.Type(), 0)
- w, err = f.File.Create()
- }
- if err != nil {
- f.s.logI("file create failed, num=%d type=%v err=%v", f.Num(), f.Type(), err)
- } else {
- f.s.logI("file created, num=%d type=%v", f.Num(), f.Type())
- f.s.opens[f.pack()] = true
- w = &writer{f, w}
- }
- return
-}
-
-func (f *file) Remove() (err error) {
- err = f.s.emulateError(ModeRemove, f.Type())
- if err == nil {
- f.s.stall(ModeRemove, f.Type())
- }
- f.s.mu.Lock()
- defer f.s.mu.Unlock()
- if err == nil {
- f.assertOpen()
- f.s.countNB(ModeRemove, f.Type(), 0)
- err = f.File.Remove()
- }
- if err != nil {
- f.s.logI("file remove failed, num=%d type=%v err=%v", f.Num(), f.Type(), err)
- } else {
- f.s.logI("file removed, num=%d type=%v", f.Num(), f.Type())
- }
- return
+ return w.s.fileClose(w.fd, w.Writer)
}
type Storage struct {
storage.Storage
- closeFn func() error
+ path string
+ onClose func() (preserve bool, err error)
+ onLog func(str string)
lmu sync.Mutex
lb bytes.Buffer
- mu sync.Mutex
+ mu sync.Mutex
+ rand *rand.Rand
// Open files, true=writer, false=reader
- opens map[uint64]bool
- counters [flattenCount]int
- bytesCounter [flattenCount]int64
- emulatedError [flattenCount]error
- stallCond sync.Cond
- stalled [flattenCount]bool
+ opens map[uint64]bool
+ counters [flattenCount]int
+ bytesCounter [flattenCount]int64
+ emulatedError [flattenCount]error
+ emulatedErrorOnce [flattenCount]bool
+ emulatedRandomError [flattenCount]error
+ emulatedRandomErrorProb [flattenCount]float64
+ stallCond sync.Cond
+ stalled [flattenCount]bool
}
func (s *Storage) log(skip int, str string) {
@@ -374,7 +290,12 @@
}
s.lb.WriteString(line)
}
- s.lb.WriteByte('\n')
+ if s.onLog != nil {
+ s.onLog(s.lb.String())
+ s.lb.Reset()
+ } else {
+ s.lb.WriteByte('\n')
+ }
}
func (s *Storage) logISkip(skip int, format string, args ...interface{}) {
@@ -395,74 +316,220 @@
s.logISkip(1, format, args...)
}
+func (s *Storage) OnLog(onLog func(log string)) {
+ s.lmu.Lock()
+ s.onLog = onLog
+ if s.lb.Len() != 0 {
+ log := s.lb.String()
+ s.onLog(log[:len(log)-1])
+ s.lb.Reset()
+ }
+ s.lmu.Unlock()
+}
+
func (s *Storage) Log(str string) {
s.log(1, "Log: "+str)
s.Storage.Log(str)
}
-func (s *Storage) Lock() (r util.Releaser, err error) {
- r, err = s.Storage.Lock()
+func (s *Storage) Lock() (l storage.Lock, err error) {
+ l, err = s.Storage.Lock()
if err != nil {
s.logI("storage locking failed, err=%v", err)
} else {
s.logI("storage locked")
- r = storageLock{s, r}
+ l = storageLock{s, l}
}
return
}
-func (s *Storage) GetFile(num uint64, t storage.FileType) storage.File {
- return &file{s, s.Storage.GetFile(num, t)}
-}
-
-func (s *Storage) GetFiles(t storage.FileType) (files []storage.File, err error) {
- rfiles, err := s.Storage.GetFiles(t)
+func (s *Storage) List(t storage.FileType) (fds []storage.FileDesc, err error) {
+ fds, err = s.Storage.List(t)
if err != nil {
- s.logI("get files failed, err=%v", err)
+ s.logI("list failed, err=%v", err)
return
}
- files = make([]storage.File, len(rfiles))
- for i, f := range rfiles {
- files[i] = &file{s, f}
- }
- s.logI("get files, type=0x%x count=%d", int(t), len(files))
+ s.logI("list, type=0x%x count=%d", int(t), len(fds))
return
}
-func (s *Storage) GetManifest() (f storage.File, err error) {
- manifest, err := s.Storage.GetManifest()
+func (s *Storage) GetMeta() (fd storage.FileDesc, err error) {
+ fd, err = s.Storage.GetMeta()
if err != nil {
if !os.IsNotExist(err) {
- s.logI("get manifest failed, err=%v", err)
+ s.logI("get meta failed, err=%v", err)
}
return
}
- s.logI("get manifest, num=%d", manifest.Num())
- return &file{s, manifest}, nil
+ s.logI("get meta, fd=%s", fd)
+ return
}
-func (s *Storage) SetManifest(f storage.File) error {
- f_, ok := f.(*file)
- ExpectWithOffset(1, ok).To(BeTrue())
- ExpectWithOffset(1, f_.Type()).To(Equal(storage.TypeManifest))
- err := s.Storage.SetManifest(f_.File)
+func (s *Storage) SetMeta(fd storage.FileDesc) error {
+ ExpectWithOffset(1, fd.Type).To(Equal(storage.TypeManifest))
+ err := s.Storage.SetMeta(fd)
if err != nil {
- s.logI("set manifest failed, err=%v", err)
+ s.logI("set meta failed, fd=%s err=%v", fd, err)
} else {
- s.logI("set manifest, num=%d", f_.Num())
+ s.logI("set meta, fd=%s", fd)
}
return err
}
+func (s *Storage) fileClose(fd storage.FileDesc, closer io.Closer) (err error) {
+ err = s.emulateError(ModeClose, fd.Type)
+ if err == nil {
+ s.stall(ModeClose, fd.Type)
+ }
+ x := packFile(fd)
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if err == nil {
+ ExpectWithOffset(2, s.opens).To(HaveKey(x), "File closed, fd=%s", fd)
+ err = closer.Close()
+ }
+ s.countNB(ModeClose, fd.Type, 0)
+ writer := s.opens[x]
+ if err != nil {
+ s.logISkip(1, "file close failed, fd=%s writer=%v err=%v", fd, writer, err)
+ } else {
+ s.logISkip(1, "file closed, fd=%s writer=%v", fd, writer)
+ delete(s.opens, x)
+ }
+ return
+}
+
+func (s *Storage) assertOpen(fd storage.FileDesc) {
+ x := packFile(fd)
+ ExpectWithOffset(2, s.opens).NotTo(HaveKey(x), "File open, fd=%s writer=%v", fd, s.opens[x])
+}
+
+func (s *Storage) Open(fd storage.FileDesc) (r storage.Reader, err error) {
+ err = s.emulateError(ModeOpen, fd.Type)
+ if err == nil {
+ s.stall(ModeOpen, fd.Type)
+ }
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if err == nil {
+ s.assertOpen(fd)
+ s.countNB(ModeOpen, fd.Type, 0)
+ r, err = s.Storage.Open(fd)
+ }
+ if err != nil {
+ s.logI("file open failed, fd=%s err=%v", fd, err)
+ } else {
+ s.logI("file opened, fd=%s", fd)
+ s.opens[packFile(fd)] = false
+ r = &reader{s, fd, r}
+ }
+ return
+}
+
+func (s *Storage) Create(fd storage.FileDesc) (w storage.Writer, err error) {
+ err = s.emulateError(ModeCreate, fd.Type)
+ if err == nil {
+ s.stall(ModeCreate, fd.Type)
+ }
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if err == nil {
+ s.assertOpen(fd)
+ s.countNB(ModeCreate, fd.Type, 0)
+ w, err = s.Storage.Create(fd)
+ }
+ if err != nil {
+ s.logI("file create failed, fd=%s err=%v", fd, err)
+ } else {
+ s.logI("file created, fd=%s", fd)
+ s.opens[packFile(fd)] = true
+ w = &writer{s, fd, w}
+ }
+ return
+}
+
+func (s *Storage) Remove(fd storage.FileDesc) (err error) {
+ err = s.emulateError(ModeRemove, fd.Type)
+ if err == nil {
+ s.stall(ModeRemove, fd.Type)
+ }
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if err == nil {
+ s.assertOpen(fd)
+ s.countNB(ModeRemove, fd.Type, 0)
+ err = s.Storage.Remove(fd)
+ }
+ if err != nil {
+ s.logI("file remove failed, fd=%s err=%v", fd, err)
+ } else {
+ s.logI("file removed, fd=%s", fd)
+ }
+ return
+}
+
+func (s *Storage) ForceRemove(fd storage.FileDesc) (err error) {
+ s.countNB(ModeRemove, fd.Type, 0)
+ if err = s.Storage.Remove(fd); err != nil {
+ s.logI("file remove failed (forced), fd=%s err=%v", fd, err)
+ } else {
+ s.logI("file removed (forced), fd=%s", fd)
+ }
+ return
+}
+
+func (s *Storage) Rename(oldfd, newfd storage.FileDesc) (err error) {
+ err = s.emulateError(ModeRename, oldfd.Type)
+ if err == nil {
+ s.stall(ModeRename, oldfd.Type)
+ }
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if err == nil {
+ s.assertOpen(oldfd)
+ s.assertOpen(newfd)
+ s.countNB(ModeRename, oldfd.Type, 0)
+ err = s.Storage.Rename(oldfd, newfd)
+ }
+ if err != nil {
+ s.logI("file rename failed, oldfd=%s newfd=%s err=%v", oldfd, newfd, err)
+ } else {
+ s.logI("file renamed, oldfd=%s newfd=%s", oldfd, newfd)
+ }
+ return
+}
+
+func (s *Storage) ForceRename(oldfd, newfd storage.FileDesc) (err error) {
+ s.countNB(ModeRename, oldfd.Type, 0)
+ if err = s.Storage.Rename(oldfd, newfd); err != nil {
+ s.logI("file rename failed (forced), oldfd=%s newfd=%s err=%v", oldfd, newfd, err)
+ } else {
+ s.logI("file renamed (forced), oldfd=%s newfd=%s", oldfd, newfd)
+ }
+ return
+}
+
func (s *Storage) openFiles() string {
out := "Open files:"
for x, writer := range s.opens {
- num, t := unpackFile(x)
- out += fmt.Sprintf("\n · num=%d type=%v writer=%v", num, t, writer)
+ fd := unpackFile(x)
+ out += fmt.Sprintf("\n · fd=%s writer=%v", fd, writer)
}
return out
}
+func (s *Storage) CloseCheck() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ ExpectWithOffset(1, s.opens).To(BeEmpty(), s.openFiles())
+}
+
+func (s *Storage) OnClose(onClose func() (preserve bool, err error)) {
+ s.mu.Lock()
+ s.onClose = onClose
+ s.mu.Unlock()
+}
+
func (s *Storage) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
@@ -473,9 +540,22 @@
} else {
s.logI("storage closed")
}
- if s.closeFn != nil {
- if err1 := s.closeFn(); err1 != nil {
- s.logI("close func error, err=%v", err1)
+ var preserve bool
+ if s.onClose != nil {
+ var err0 error
+ if preserve, err0 = s.onClose(); err0 != nil {
+ s.logI("onClose error, err=%v", err0)
+ }
+ }
+ if s.path != "" {
+ if storageKeepFS || preserve {
+ s.logI("storage is preserved, path=%v", s.path)
+ } else {
+ if err1 := os.RemoveAll(s.path); err1 != nil {
+ s.logI("cannot remove storage, err=%v", err1)
+ } else {
+ s.logI("storage has been removed")
+ }
}
}
return err
@@ -510,8 +590,14 @@
func (s *Storage) emulateError(m StorageMode, t storage.FileType) error {
s.mu.Lock()
defer s.mu.Unlock()
- err := s.emulatedError[flattenType(m, t)]
- if err != nil {
+ x := flattenType(m, t)
+ if err := s.emulatedError[x]; err != nil {
+ if s.emulatedErrorOnce[x] {
+ s.emulatedError[x] = nil
+ }
+ return emulatedError{err}
+ }
+ if err := s.emulatedRandomError[x]; err != nil && s.rand.Float64() < s.emulatedRandomErrorProb[x] {
return emulatedError{err}
}
return nil
@@ -522,6 +608,25 @@
defer s.mu.Unlock()
for _, x := range listFlattenType(m, t) {
s.emulatedError[x] = err
+ s.emulatedErrorOnce[x] = false
+ }
+}
+
+func (s *Storage) EmulateErrorOnce(m StorageMode, t storage.FileType, err error) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ for _, x := range listFlattenType(m, t) {
+ s.emulatedError[x] = err
+ s.emulatedErrorOnce[x] = true
+ }
+}
+
+func (s *Storage) EmulateRandomError(m StorageMode, t storage.FileType, prob float64, err error) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ for _, x := range listFlattenType(m, t) {
+ s.emulatedRandomError[x] = err
+ s.emulatedRandomErrorProb[x] = prob
}
}
@@ -552,24 +657,20 @@
}
func NewStorage() *Storage {
- var stor storage.Storage
- var closeFn func() error
+ var (
+ stor storage.Storage
+ path string
+ )
if storageUseFS {
for {
storageMu.Lock()
num := storageNum
storageNum++
storageMu.Unlock()
- path := filepath.Join(os.TempDir(), fmt.Sprintf("goleveldb-test%d0%d0%d", os.Getuid(), os.Getpid(), num))
+ path = filepath.Join(os.TempDir(), fmt.Sprintf("goleveldb-test%d0%d0%d", os.Getuid(), os.Getpid(), num))
if _, err := os.Stat(path); os.IsNotExist(err) {
stor, err = storage.OpenFile(path)
ExpectWithOffset(1, err).NotTo(HaveOccurred(), "creating storage at %s", path)
- closeFn = func() error {
- if storageKeepFS {
- return nil
- }
- return os.RemoveAll(path)
- }
break
}
}
@@ -578,9 +679,16 @@
}
s := &Storage{
Storage: stor,
- closeFn: closeFn,
+ path: path,
+ rand: NewRand(),
opens: make(map[uint64]bool),
}
s.stallCond.L = &s.mu
+ if s.path != "" {
+ s.logI("using FS storage")
+ s.logI("storage path: %s", s.path)
+ } else {
+ s.logI("using MEM storage")
+ }
return s
}
diff --git a/leveldb/util.go b/leveldb/util.go
index 1a5bf71..3b663d1 100644
--- a/leveldb/util.go
+++ b/leveldb/util.go
@@ -72,20 +72,20 @@
return b
}
-type files []storage.File
+type fdSorter []storage.FileDesc
-func (p files) Len() int {
+func (p fdSorter) Len() int {
return len(p)
}
-func (p files) Less(i, j int) bool {
- return p[i].Num() < p[j].Num()
+func (p fdSorter) Less(i, j int) bool {
+ return p[i].Num < p[j].Num
}
-func (p files) Swap(i, j int) {
+func (p fdSorter) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}
-func (p files) sort() {
- sort.Sort(p)
+func sortFds(fds []storage.FileDesc) {
+ sort.Sort(fdSorter(fds))
}
diff --git a/leveldb/version.go b/leveldb/version.go
index 015dc72..16b57be 100644
--- a/leveldb/version.go
+++ b/leveldb/version.go
@@ -52,17 +52,17 @@
panic("negative version ref")
}
- nextTables := make(map[uint64]bool)
+ nextTables := make(map[int64]bool)
for _, tt := range v.next.levels {
for _, t := range tt {
- num := t.file.Num()
+ num := t.fd.Num
nextTables[num] = true
}
}
for _, tt := range v.levels {
for _, t := range tt {
- num := t.file.Num()
+ num := t.fd.Num
if _, ok := nextTables[num]; !ok {
v.s.tops.remove(t)
}
@@ -275,7 +275,7 @@
for _, t := range tables {
if v.s.icmp.Compare(t.imax, ikey) <= 0 {
// Entire file is before "ikey", so just add the file size
- n += t.size
+ n += uint64(t.size)
} else if v.s.icmp.Compare(t.imin, ikey) > 0 {
// Entire file is after "ikey", so ignore
if level > 0 {
@@ -315,7 +315,7 @@
}
if gpLevel := level + 2; gpLevel < len(v.levels) {
overlaps = v.levels[gpLevel].getOverlaps(overlaps, v.s.icmp, umin, umax, false)
- if overlaps.size() > uint64(v.s.o.GetCompactionGPOverlaps(level)) {
+ if overlaps.size() > int64(v.s.o.GetCompactionGPOverlaps(level)) {
break
}
}
@@ -333,7 +333,7 @@
statFiles := make([]int, len(v.levels))
statSizes := make([]string, len(v.levels))
statScore := make([]string, len(v.levels))
- statTotSize := uint64(0)
+ statTotSize := int64(0)
for level, tables := range v.levels {
var score float64
@@ -377,8 +377,8 @@
}
type tablesScratch struct {
- added map[uint64]atRecord
- deleted map[uint64]struct{}
+ added map[int64]atRecord
+ deleted map[int64]struct{}
}
type versionStaging struct {
@@ -401,7 +401,7 @@
scratch := p.getScratch(r.level)
if r.level < len(p.base.levels) && len(p.base.levels[r.level]) > 0 {
if scratch.deleted == nil {
- scratch.deleted = make(map[uint64]struct{})
+ scratch.deleted = make(map[int64]struct{})
}
scratch.deleted[r.num] = struct{}{}
}
@@ -414,7 +414,7 @@
for _, r := range r.addedTables {
scratch := p.getScratch(r.level)
if scratch.added == nil {
- scratch.added = make(map[uint64]atRecord)
+ scratch.added = make(map[int64]atRecord)
}
scratch.added[r.num] = r
if scratch.deleted != nil {
@@ -448,10 +448,10 @@
// Base tables.
for _, t := range baseTabels {
- if _, ok := scratch.deleted[t.file.Num()]; ok {
+ if _, ok := scratch.deleted[t.fd.Num]; ok {
continue
}
- if _, ok := scratch.added[t.file.Num()]; ok {
+ if _, ok := scratch.added[t.fd.Num]; ok {
continue
}
nt = append(nt, t)
@@ -459,7 +459,7 @@
// New tables.
for _, r := range scratch.added {
- nt = append(nt, p.base.s.tableFileFromRecord(r))
+ nt = append(nt, tableFileFromRecord(r))
}
if len(nt) != 0 {
diff --git a/leveldb/version_test.go b/leveldb/version_test.go
index 87d54fb..b400883 100644
--- a/leveldb/version_test.go
+++ b/leveldb/version_test.go
@@ -4,15 +4,20 @@
"encoding/binary"
"reflect"
"testing"
+
+ "github.com/onsi/gomega"
+
+ "github.com/syndtr/goleveldb/leveldb/testutil"
)
type testFileRec struct {
level int
- num uint64
+ num int64
}
func TestVersionStaging(t *testing.T) {
- stor := newTestStorage(t)
+ gomega.RegisterTestingT(t)
+ stor := testutil.NewStorage()
defer stor.Close()
s, err := newSession(stor, nil)
if err != nil {
@@ -30,13 +35,13 @@
for i, x := range []struct {
add, del []testFileRec
- levels [][]uint64
+ levels [][]int64
}{
{
add: []testFileRec{
{1, 1},
},
- levels: [][]uint64{
+ levels: [][]int64{
{},
{1},
},
@@ -45,7 +50,7 @@
add: []testFileRec{
{1, 1},
},
- levels: [][]uint64{
+ levels: [][]int64{
{},
{1},
},
@@ -54,7 +59,7 @@
del: []testFileRec{
{1, 1},
},
- levels: [][]uint64{},
+ levels: [][]int64{},
},
{
add: []testFileRec{
@@ -64,7 +69,7 @@
{2, 5},
{1, 4},
},
- levels: [][]uint64{
+ levels: [][]int64{
{3, 2, 1},
{4},
{5},
@@ -79,7 +84,7 @@
{0, 1},
{0, 4},
},
- levels: [][]uint64{
+ levels: [][]int64{
{3, 2},
{4, 6},
{5},
@@ -93,13 +98,13 @@
{1, 6},
{2, 5},
},
- levels: [][]uint64{},
+ levels: [][]int64{},
},
{
add: []testFileRec{
{0, 1},
},
- levels: [][]uint64{
+ levels: [][]int64{
{1},
},
},
@@ -107,7 +112,7 @@
add: []testFileRec{
{1, 2},
},
- levels: [][]uint64{
+ levels: [][]int64{
{1},
{2},
},
@@ -116,7 +121,7 @@
add: []testFileRec{
{0, 3},
},
- levels: [][]uint64{
+ levels: [][]int64{
{3, 1},
{2},
},
@@ -125,7 +130,7 @@
add: []testFileRec{
{6, 9},
},
- levels: [][]uint64{
+ levels: [][]int64{
{3, 1},
{2},
{},
@@ -139,7 +144,7 @@
del: []testFileRec{
{6, 9},
},
- levels: [][]uint64{
+ levels: [][]int64{
{3, 1},
{2},
},
@@ -147,7 +152,7 @@
} {
rec := &sessionRecord{}
for _, f := range x.add {
- ik := makeIKey(f.num)
+ ik := makeIKey(uint64(f.num))
rec.addTable(f.level, f.num, 1, ik, ik)
}
for _, f := range x.del {
@@ -164,9 +169,9 @@
if len(want) != len(tables) {
t.Fatalf("#%d.%d: invalid tables count: want=%d got=%d", i, j, len(want), len(tables))
}
- got := make([]uint64, len(tables))
+ got := make([]int64, len(tables))
for k, t := range tables {
- got[k] = t.file.Num()
+ got[k] = t.fd.Num
}
if !reflect.DeepEqual(want, got) {
t.Fatalf("#%d.%d: invalid tables: want=%v got=%v", i, j, want, got)
diff --git a/manualtest/dbstress/key.go b/manualtest/dbstress/key.go
index 906d0d8..c9f6963 100644
--- a/manualtest/dbstress/key.go
+++ b/manualtest/dbstress/key.go
@@ -5,6 +5,7 @@
"fmt"
"github.com/syndtr/goleveldb/leveldb/errors"
+ "github.com/syndtr/goleveldb/leveldb/storage"
)
type ErrIkeyCorrupted struct {
@@ -17,7 +18,7 @@
}
func newErrIkeyCorrupted(ikey []byte, reason string) error {
- return errors.NewErrCorrupted(nil, &ErrIkeyCorrupted{append([]byte{}, ikey...), reason})
+ return errors.NewErrCorrupted(storage.FileDesc{}, &ErrIkeyCorrupted{append([]byte{}, ikey...), reason})
}
type kType int
diff --git a/manualtest/dbstress/main.go b/manualtest/dbstress/main.go
index cb08897..9ca3204 100644
--- a/manualtest/dbstress/main.go
+++ b/manualtest/dbstress/main.go
@@ -145,52 +145,98 @@
return util.BytesPrefix([]byte{ns})
}
-type testingFile struct {
- storage.File
-}
-
-func (tf *testingFile) Remove() error {
- if atomic.LoadUint32(&fail) == 1 {
- return nil
- }
-
- if tf.Type() == storage.TypeTable {
- if scanTable(tf, true) {
- return nil
- }
- }
- return tf.File.Remove()
-}
-
type testingStorage struct {
storage.Storage
}
-func (ts *testingStorage) GetFile(num uint64, t storage.FileType) storage.File {
- return &testingFile{ts.Storage.GetFile(num, t)}
-}
-
-func (ts *testingStorage) GetFiles(t storage.FileType) ([]storage.File, error) {
- files, err := ts.Storage.GetFiles(t)
+func (ts *testingStorage) scanTable(fd storage.FileDesc, checksum bool) (corrupted bool) {
+ r, err := ts.Open(fd)
if err != nil {
- return nil, err
+ log.Fatal(err)
}
- for i := range files {
- files[i] = &testingFile{files[i]}
+ defer r.Close()
+
+ size, err := r.Seek(0, os.SEEK_END)
+ if err != nil {
+ log.Fatal(err)
}
- return files, nil
+
+ o := &opt.Options{Strict: opt.NoStrict}
+ if checksum {
+ o.Strict = opt.StrictBlockChecksum | opt.StrictReader
+ }
+ tr, err := table.NewReader(r, size, fd, nil, bpool, o)
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer tr.Release()
+
+ checkData := func(i int, t string, data []byte) bool {
+ if len(data) == 0 {
+ panic(fmt.Sprintf("[%v] nil data: i=%d t=%s", fd, i, t))
+ }
+
+ checksum0, checksum1 := dataChecksum(data)
+ if checksum0 != checksum1 {
+ atomic.StoreUint32(&fail, 1)
+ atomic.StoreUint32(&done, 1)
+ corrupted = true
+
+ data0, data1 := dataSplit(data)
+ data0c0, data0c1 := dataChecksum(data0)
+ data1c0, data1c1 := dataChecksum(data1)
+ log.Printf("FATAL: [%v] Corrupted data i=%d t=%s (%#x != %#x): %x(%v) vs %x(%v)",
+ fd, i, t, checksum0, checksum1, data0, data0c0 == data0c1, data1, data1c0 == data1c1)
+ return true
+ }
+ return false
+ }
+
+ iter := tr.NewIterator(nil, nil)
+ defer iter.Release()
+ for i := 0; iter.Next(); i++ {
+ ukey, _, kt, kerr := parseIkey(iter.Key())
+ if kerr != nil {
+ atomic.StoreUint32(&fail, 1)
+ atomic.StoreUint32(&done, 1)
+ corrupted = true
+
+ log.Printf("FATAL: [%v] Corrupted ikey i=%d: %v", fd, i, kerr)
+ return
+ }
+ if checkData(i, "key", ukey) {
+ return
+ }
+ if kt == ktVal && checkData(i, "value", iter.Value()) {
+ return
+ }
+ }
+ if err := iter.Error(); err != nil {
+ if errors.IsCorrupted(err) {
+ atomic.StoreUint32(&fail, 1)
+ atomic.StoreUint32(&done, 1)
+ corrupted = true
+
+ log.Printf("FATAL: [%v] Corruption detected: %v", fd, err)
+ } else {
+ log.Fatal(err)
+ }
+ }
+
+ return
}
-func (ts *testingStorage) GetManifest() (storage.File, error) {
- f, err := ts.Storage.GetManifest()
- if err == nil {
- f = &testingFile{f}
+func (ts *testingStorage) Remove(fd storage.FileDesc) error {
+ if atomic.LoadUint32(&fail) == 1 {
+ return nil
}
- return f, err
-}
-func (ts *testingStorage) SetManifest(f storage.File) error {
- return ts.Storage.SetManifest(f.(*testingFile).File)
+ if fd.Type == storage.TypeTable {
+ if ts.scanTable(fd, true) {
+ return nil
+ }
+ }
+ return ts.Storage.Remove(fd)
}
type latencyStats struct {
@@ -246,84 +292,6 @@
s.num += x.num
}
-func scanTable(f storage.File, checksum bool) (corrupted bool) {
- fi := storage.NewFileInfo(f)
- r, err := f.Open()
- if err != nil {
- log.Fatal(err)
- }
- defer r.Close()
-
- size, err := r.Seek(0, os.SEEK_END)
- if err != nil {
- log.Fatal(err)
- }
-
- o := &opt.Options{Strict: opt.NoStrict}
- if checksum {
- o.Strict = opt.StrictBlockChecksum | opt.StrictReader
- }
- tr, err := table.NewReader(r, size, fi, nil, bpool, o)
- if err != nil {
- log.Fatal(err)
- }
- defer tr.Release()
-
- checkData := func(i int, t string, data []byte) bool {
- if len(data) == 0 {
- panic(fmt.Sprintf("[%v] nil data: i=%d t=%s", fi, i, t))
- }
-
- checksum0, checksum1 := dataChecksum(data)
- if checksum0 != checksum1 {
- atomic.StoreUint32(&fail, 1)
- atomic.StoreUint32(&done, 1)
- corrupted = true
-
- data0, data1 := dataSplit(data)
- data0c0, data0c1 := dataChecksum(data0)
- data1c0, data1c1 := dataChecksum(data1)
- log.Printf("FATAL: [%v] Corrupted data i=%d t=%s (%#x != %#x): %x(%v) vs %x(%v)",
- fi, i, t, checksum0, checksum1, data0, data0c0 == data0c1, data1, data1c0 == data1c1)
- return true
- }
- return false
- }
-
- iter := tr.NewIterator(nil, nil)
- defer iter.Release()
- for i := 0; iter.Next(); i++ {
- ukey, _, kt, kerr := parseIkey(iter.Key())
- if kerr != nil {
- atomic.StoreUint32(&fail, 1)
- atomic.StoreUint32(&done, 1)
- corrupted = true
-
- log.Printf("FATAL: [%v] Corrupted ikey i=%d: %v", fi, i, kerr)
- return
- }
- if checkData(i, "key", ukey) {
- return
- }
- if kt == ktVal && checkData(i, "value", iter.Value()) {
- return
- }
- }
- if err := iter.Error(); err != nil {
- if errors.IsCorrupted(err) {
- atomic.StoreUint32(&fail, 1)
- atomic.StoreUint32(&done, 1)
- corrupted = true
-
- log.Printf("FATAL: [%v] Corruption detected: %v", fi, err)
- } else {
- log.Fatal(err)
- }
- }
-
- return
-}
-
func main() {
flag.Parse()
@@ -349,8 +317,8 @@
if err != nil {
log.Fatal(err)
}
- stor = &testingStorage{stor}
- defer stor.Close()
+ tstor := &testingStorage{stor}
+ defer tstor.Close()
fatalf := func(err error, format string, v ...interface{}) {
atomic.StoreUint32(&fail, 1)
@@ -358,10 +326,10 @@
log.Printf("FATAL: "+format, v...)
if err != nil && errors.IsCorrupted(err) {
cerr := err.(*errors.ErrCorrupted)
- if cerr.File != nil && cerr.File.Type == storage.TypeTable {
+ if !cerr.Fd.Nil() && cerr.Fd.Type == storage.TypeTable {
log.Print("FATAL: corruption detected, scanning...")
- if !scanTable(stor.GetFile(cerr.File.Num, cerr.File.Type), false) {
- log.Printf("FATAL: unable to find corrupted key/value pair in table %v", cerr.File)
+ if !tstor.scanTable(storage.FileDesc{Type: storage.TypeTable, Num: cerr.Fd.Num}, false) {
+ log.Printf("FATAL: unable to find corrupted key/value pair in table %v", cerr.Fd)
}
}
}
@@ -382,7 +350,7 @@
o.Compression = opt.DefaultCompression
}
- db, err := leveldb.Open(stor, o)
+ db, err := leveldb.Open(tstor, o)
if err != nil {
log.Fatal(err)
}
@@ -533,7 +501,7 @@
getStat.record(1)
if checksum0, checksum1 := dataChecksum(v2); checksum0 != checksum1 {
- err := &errors.ErrCorrupted{File: &storage.FileInfo{0xff, 0}, Err: fmt.Errorf("v2: %x: checksum mismatch: %v vs %v", v2, checksum0, checksum1)}
+ err := &errors.ErrCorrupted{Fd: storage.FileDesc{0xff, 0}, Err: fmt.Errorf("v2: %x: checksum mismatch: %v vs %v", v2, checksum0, checksum1)}
fatalf(err, "[%02d] READER #%d.%d K%d snap.Get: %v\nk1: %x\n -> k2: %x", ns, snapwi, ri, n, err, k1, k2)
}