leveldb: allows open or puts DB into read-only mode (closes #107)
diff --git a/leveldb/db.go b/leveldb/db.go
index caf9b07..def86bc 100644
--- a/leveldb/db.go
+++ b/leveldb/db.go
@@ -63,13 +63,14 @@
journalAckC chan error
// Compaction.
- tcompCmdC chan cCmd
- tcompPauseC chan chan<- struct{}
- mcompCmdC chan cCmd
- compErrC chan error
- compPerErrC chan error
- compErrSetC chan error
- compStats []cStats
+ tcompCmdC chan cCmd
+ tcompPauseC chan chan<- struct{}
+ mcompCmdC chan cCmd
+ compErrC chan error
+ compPerErrC chan error
+ compErrSetC chan error
+ compWriteLocking bool
+ compStats []cStats
// Close.
closeW sync.WaitGroup
@@ -108,28 +109,44 @@
closeC: make(chan struct{}),
}
- if err := db.recoverJournal(); err != nil {
- return nil, err
- }
+ // Read-only mode.
+ readOnly := s.o.GetReadOnly()
- // Remove any obsolete files.
- if err := db.checkAndCleanFiles(); err != nil {
- // Close journal.
- if db.journal != nil {
- db.journal.Close()
- db.journalWriter.Close()
+ if readOnly {
+ // Recover journals (read-only mode).
+ if err := db.recoverJournalRO(); err != nil {
+ return nil, err
}
- return nil, err
+ } else {
+ // Recover journals.
+ if err := db.recoverJournal(); err != nil {
+ return nil, err
+ }
+
+ // Remove any obsolete files.
+ if err := db.checkAndCleanFiles(); err != nil {
+ // Close journal.
+ if db.journal != nil {
+ db.journal.Close()
+ db.journalWriter.Close()
+ }
+ return nil, err
+ }
+
}
// Doesn't need to be included in the wait group.
go db.compactionError()
go db.mpoolDrain()
- db.closeW.Add(3)
- go db.tCompaction()
- go db.mCompaction()
- go db.jWriter()
+ if readOnly {
+ db.SetReadOnly()
+ } else {
+ db.closeW.Add(3)
+ go db.tCompaction()
+ go db.mCompaction()
+ go db.jWriter()
+ }
s.logf("db@open done T·%v", time.Since(start))
@@ -492,85 +509,89 @@
for _, jf := range recJournalFiles {
db.logf("journal@recovery recovering @%d", jf.Num())
- err := func() error {
- fr, err := jf.Open()
- if err != nil {
- return err
- }
- defer fr.Close()
-
- // Create or reset journal reader instance.
- if jr == nil {
- jr = journal.NewReader(fr, dropper{db.s, jf}, strict, checksum)
- } else {
- jr.Reset(fr, dropper{db.s, jf}, strict, checksum)
- }
-
- // Flush memdb and remove obsolete journal file.
- if of != nil {
- if mdb.Len() > 0 {
- if _, err := db.s.flushMemdb(rec, mdb, -1); err != nil {
- return err
- }
- }
- rec.setJournalNum(jf.Num())
- rec.setSeqNum(db.seq)
- if err := db.s.commit(rec); err != nil {
- return err
- }
- rec.resetAddedTables()
- of.Remove()
- of = nil
- }
-
- // Replay journal to memdb.
- mdb.Reset()
- for {
- r, err := jr.Next()
- if err != nil {
- if err == io.EOF {
- break
- }
- return errors.SetFile(err, jf)
- }
-
- buf.Reset()
- if _, err := buf.ReadFrom(r); err != nil {
- if err == io.ErrUnexpectedEOF {
- // This is error returned due to corruption, with strict == false.
- continue
- } else {
- return errors.SetFile(err, jf)
- }
- }
- if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
- if strict || !errors.IsCorrupted(err) {
- return errors.SetFile(err, jf)
- } else {
- db.s.logf("journal error: %v (skipped)", err)
- // We won't apply sequence number as it might be corrupted.
- continue
- }
- }
-
- // Save sequence number.
- db.seq = batch.seq + uint64(batch.Len())
-
- // Flush it if large enough.
- if mdb.Size() >= writeBuffer {
- if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
- return err
- }
- mdb.Reset()
- }
- }
-
- of = jf
- return nil
- }()
+ fr, err := jf.Open()
if err != nil {
return err
}
+
+ // Create or reset journal reader instance.
+ if jr == nil {
+ jr = journal.NewReader(fr, dropper{db.s, jf}, strict, checksum)
+ } else {
+ jr.Reset(fr, dropper{db.s, jf}, strict, checksum)
+ }
+
+ // Flush memdb and remove obsolete journal file.
+ if of != nil {
+ if mdb.Len() > 0 {
+ if _, err := db.s.flushMemdb(rec, mdb, -1); err != nil {
+ fr.Close()
+ return err
+ }
+ }
+
+ rec.setJournalNum(jf.Num())
+ rec.setSeqNum(db.seq)
+ if err := db.s.commit(rec); err != nil {
+ fr.Close()
+ return err
+ }
+ rec.resetAddedTables()
+
+ of.Remove()
+ of = nil
+ }
+
+ // Replay journal to memdb.
+ mdb.Reset()
+ for {
+ r, err := jr.Next()
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+
+ fr.Close()
+ return errors.SetFile(err, jf)
+ }
+
+ buf.Reset()
+ if _, err := buf.ReadFrom(r); err != nil {
+ if err == io.ErrUnexpectedEOF {
+ // This is error returned due to corruption, with strict == false.
+ continue
+ }
+
+ fr.Close()
+ return errors.SetFile(err, jf)
+ }
+ if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
+ if !strict && errors.IsCorrupted(err) {
+ db.s.logf("journal error: %v (skipped)", err)
+ // We won't apply sequence number as it might be corrupted.
+ continue
+ }
+
+ fr.Close()
+ return errors.SetFile(err, jf)
+ }
+
+ // Save sequence number.
+ db.seq = batch.seq + uint64(batch.Len())
+
+ // Flush it if large enough.
+ if mdb.Size() >= writeBuffer {
+ if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
+ fr.Close()
+ return err
+ }
+
+ mdb.Reset()
+ }
+ }
+
+ fr.Close()
+ of = jf
}
// Flush the last memdb.
@@ -606,6 +627,103 @@
return nil
}
+func (db *DB) recoverJournalRO() error {
+ // Get all journals and sort it by file number.
+ allJournalFiles, err := db.s.getFiles(storage.TypeJournal)
+ if err != nil {
+ return err
+ }
+ files(allJournalFiles).sort()
+
+ // Journals that will be recovered.
+ var recJournalFiles []storage.File
+ for _, jf := range allJournalFiles {
+ if jf.Num() >= db.s.stJournalNum || jf.Num() == db.s.stPrevJournalNum {
+ recJournalFiles = append(recJournalFiles, jf)
+ }
+ }
+
+ var (
+ // Options.
+ strict = db.s.o.GetStrict(opt.StrictJournal)
+ checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
+ writeBuffer = db.s.o.GetWriteBuffer()
+
+ mdb = memdb.New(db.s.icmp, writeBuffer)
+ )
+
+ // Recover journals.
+ if len(recJournalFiles) > 0 {
+ db.logf("journal@recovery RO·Mode F·%d", len(recJournalFiles))
+
+ var (
+ jr *journal.Reader
+ buf = &util.Buffer{}
+ batch = &Batch{}
+ )
+
+ for _, jf := range recJournalFiles {
+ db.logf("journal@recovery recovering @%d", jf.Num())
+
+ fr, err := jf.Open()
+ if err != nil {
+ return err
+ }
+
+ // Create or reset journal reader instance.
+ if jr == nil {
+ jr = journal.NewReader(fr, dropper{db.s, jf}, strict, checksum)
+ } else {
+ jr.Reset(fr, dropper{db.s, jf}, strict, checksum)
+ }
+
+ // Replay journal to memdb.
+ for {
+ r, err := jr.Next()
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+
+ fr.Close()
+ return errors.SetFile(err, jf)
+ }
+
+ buf.Reset()
+ if _, err := buf.ReadFrom(r); err != nil {
+ if err == io.ErrUnexpectedEOF {
+ // This is error returned due to corruption, with strict == false.
+ continue
+ }
+
+ fr.Close()
+ return errors.SetFile(err, jf)
+ }
+ if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
+ if !strict && errors.IsCorrupted(err) {
+ db.s.logf("journal error: %v (skipped)", err)
+ // We won't apply sequence number as it might be corrupted.
+ continue
+ }
+
+ fr.Close()
+ return errors.SetFile(err, jf)
+ }
+
+ // Save sequence number.
+ db.seq = batch.seq + uint64(batch.Len())
+ }
+
+ fr.Close()
+ }
+ }
+
+ // Set memDB.
+ db.mem = &memDB{db: db, DB: mdb, ref: 1}
+
+ return nil
+}
+
func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
ikey := newIkey(key, seq, ktSeek)
@@ -902,6 +1020,9 @@
var err error
select {
case err = <-db.compErrC:
+ if err == ErrReadOnly {
+ err = nil
+ }
default:
}
diff --git a/leveldb/db_compaction.go b/leveldb/db_compaction.go
index 3b4c469..2600310 100644
--- a/leveldb/db_compaction.go
+++ b/leveldb/db_compaction.go
@@ -62,10 +62,7 @@
}
func (db *DB) compactionError() {
- var (
- err error
- wlocked bool
- )
+ var err error
noerr:
// No error.
for {
@@ -73,7 +70,7 @@
case err = <-db.compErrSetC:
switch {
case err == nil:
- case errors.IsCorrupted(err):
+ case err == ErrReadOnly, errors.IsCorrupted(err):
goto hasperr
default:
goto haserr
@@ -91,7 +88,7 @@
switch {
case err == nil:
goto noerr
- case errors.IsCorrupted(err):
+ case err == ErrReadOnly, errors.IsCorrupted(err):
goto hasperr
default:
}
@@ -107,9 +104,9 @@
case db.compPerErrC <- err:
case db.writeLockC <- struct{}{}:
// Hold write lock, so that write won't pass-through.
- wlocked = true
+ db.compWriteLocking = true
case _, _ = <-db.closeC:
- if wlocked {
+ if db.compWriteLocking {
// We should release the lock or Close will hang.
<-db.writeLockC
}
diff --git a/leveldb/db_test.go b/leveldb/db_test.go
index 2f60d17..9d91ebf 100644
--- a/leveldb/db_test.go
+++ b/leveldb/db_test.go
@@ -2663,3 +2663,39 @@
func TestDB_IterTriggeredCompactionHalf(t *testing.T) {
testDB_IterTriggeredCompaction(t, 2)
}
+
+func TestDB_ReadOnly(t *testing.T) {
+ h := newDbHarness(t)
+ defer h.close()
+
+ h.put("foo", "v1")
+ h.put("bar", "v2")
+ h.compactMem()
+
+ h.put("xfoo", "v1")
+ h.put("xbar", "v2")
+
+ t.Log("Trigger read-only")
+ if err := h.db.SetReadOnly(); err != nil {
+ h.close()
+ t.Fatalf("SetReadOnly error: %v", err)
+ }
+
+ h.stor.SetEmuErr(storage.TypeAll, tsOpCreate, tsOpReplace, tsOpRemove, tsOpWrite, tsOpWrite, tsOpSync)
+
+ ro := func(key, value, wantValue string) {
+ if err := h.db.Put([]byte(key), []byte(value), h.wo); err != ErrReadOnly {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ h.getVal(key, wantValue)
+ }
+
+ ro("foo", "vx", "v1")
+
+ h.o.ReadOnly = true
+ h.reopenDB()
+
+ ro("foo", "vx", "v1")
+ ro("bar", "vx", "v2")
+ h.assertNumKeys(4)
+}
diff --git a/leveldb/db_write.go b/leveldb/db_write.go
index 1e412b3..99ee85d 100644
--- a/leveldb/db_write.go
+++ b/leveldb/db_write.go
@@ -309,3 +309,31 @@
// Table compaction.
return db.compSendRange(db.tcompCmdC, -1, r.Start, r.Limit)
}
+
+// SetReadOnly makes DB read-only. It will stay read-only until reopened.
+func (db *DB) SetReadOnly() error {
+ if err := db.ok(); err != nil {
+ return err
+ }
+
+ // Lock writer.
+ select {
+ case db.writeLockC <- struct{}{}:
+ db.compWriteLocking = true
+ case err := <-db.compPerErrC:
+ return err
+ case _, _ = <-db.closeC:
+ return ErrClosed
+ }
+
+ // Set compaction read-only.
+ select {
+ case db.compErrSetC <- ErrReadOnly:
+ case perr := <-db.compPerErrC:
+ return perr
+ case _, _ = <-db.closeC:
+ return ErrClosed
+ }
+
+ return nil
+}
diff --git a/leveldb/errors.go b/leveldb/errors.go
index 29d0d2f..c8bd66a 100644
--- a/leveldb/errors.go
+++ b/leveldb/errors.go
@@ -12,6 +12,7 @@
var (
ErrNotFound = errors.ErrNotFound
+ ErrReadOnly = errors.New("leveldb: read-only mode")
ErrSnapshotReleased = errors.New("leveldb: snapshot released")
ErrIterReleased = errors.New("leveldb: iterator released")
ErrClosed = errors.New("leveldb: closed")
diff --git a/leveldb/opt/options.go b/leveldb/opt/options.go
index 4464c4f..f9a309d 100644
--- a/leveldb/opt/options.go
+++ b/leveldb/opt/options.go
@@ -326,6 +326,11 @@
// The default value is 500.
OpenFilesCacheCapacity int
+ // If true then opens DB in read-only mode.
+ //
+ // The default value is false.
+ ReadOnly bool
+
// Strict defines the DB strict level.
Strict Strict
@@ -567,6 +572,13 @@
return o.OpenFilesCacheCapacity
}
+func (o *Options) GetReadOnly() bool {
+ if o == nil {
+ return false
+ }
+ return o.ReadOnly
+}
+
func (o *Options) GetStrict(strict Strict) bool {
if o == nil || o.Strict == 0 {
return DefaultStrict&strict != 0
diff --git a/leveldb/storage_test.go b/leveldb/storage_test.go
index dc1f1fb..08be0ba 100644
--- a/leveldb/storage_test.go
+++ b/leveldb/storage_test.go
@@ -42,6 +42,8 @@
const (
tsOpOpen tsOp = iota
tsOpCreate
+ tsOpReplace
+ tsOpRemove
tsOpRead
tsOpReadAt
tsOpWrite
@@ -241,6 +243,10 @@
if err != nil {
return
}
+ if tf.shouldErr(tsOpReplace) {
+ err = errors.New("leveldb.testStorage: emulated create error")
+ return
+ }
err = tf.File.Replace(newfile.(tsFile).File)
if err != nil {
ts.t.Errorf("E: cannot replace file, num=%d type=%v: %v", tf.Num(), tf.Type(), err)
@@ -258,6 +264,10 @@
if err != nil {
return
}
+ if tf.shouldErr(tsOpRemove) {
+ err = errors.New("leveldb.testStorage: emulated create error")
+ return
+ }
err = tf.File.Remove()
if err != nil {
ts.t.Errorf("E: cannot remove file, num=%d type=%v: %v", tf.Num(), tf.Type(), err)