leveldb: allows open or puts DB into read-only mode (closes #107)
diff --git a/leveldb/db.go b/leveldb/db.go
index caf9b07..def86bc 100644
--- a/leveldb/db.go
+++ b/leveldb/db.go
@@ -63,13 +63,14 @@
 	journalAckC  chan error
 
 	// Compaction.
-	tcompCmdC   chan cCmd
-	tcompPauseC chan chan<- struct{}
-	mcompCmdC   chan cCmd
-	compErrC    chan error
-	compPerErrC chan error
-	compErrSetC chan error
-	compStats   []cStats
+	tcompCmdC        chan cCmd
+	tcompPauseC      chan chan<- struct{}
+	mcompCmdC        chan cCmd
+	compErrC         chan error
+	compPerErrC      chan error
+	compErrSetC      chan error
+	compWriteLocking bool
+	compStats        []cStats
 
 	// Close.
 	closeW sync.WaitGroup
@@ -108,28 +109,44 @@
 		closeC: make(chan struct{}),
 	}
 
-	if err := db.recoverJournal(); err != nil {
-		return nil, err
-	}
+	// Read-only mode.
+	readOnly := s.o.GetReadOnly()
 
-	// Remove any obsolete files.
-	if err := db.checkAndCleanFiles(); err != nil {
-		// Close journal.
-		if db.journal != nil {
-			db.journal.Close()
-			db.journalWriter.Close()
+	if readOnly {
+		// Recover journals (read-only mode).
+		if err := db.recoverJournalRO(); err != nil {
+			return nil, err
 		}
-		return nil, err
+	} else {
+		// Recover journals.
+		if err := db.recoverJournal(); err != nil {
+			return nil, err
+		}
+
+		// Remove any obsolete files.
+		if err := db.checkAndCleanFiles(); err != nil {
+			// Close journal.
+			if db.journal != nil {
+				db.journal.Close()
+				db.journalWriter.Close()
+			}
+			return nil, err
+		}
+
 	}
 
 	// Doesn't need to be included in the wait group.
 	go db.compactionError()
 	go db.mpoolDrain()
 
-	db.closeW.Add(3)
-	go db.tCompaction()
-	go db.mCompaction()
-	go db.jWriter()
+	if readOnly {
+		db.SetReadOnly()
+	} else {
+		db.closeW.Add(3)
+		go db.tCompaction()
+		go db.mCompaction()
+		go db.jWriter()
+	}
 
 	s.logf("db@open done T·%v", time.Since(start))
 
@@ -492,85 +509,89 @@
 		for _, jf := range recJournalFiles {
 			db.logf("journal@recovery recovering @%d", jf.Num())
 
-			err := func() error {
-				fr, err := jf.Open()
-				if err != nil {
-					return err
-				}
-				defer fr.Close()
-
-				// Create or reset journal reader instance.
-				if jr == nil {
-					jr = journal.NewReader(fr, dropper{db.s, jf}, strict, checksum)
-				} else {
-					jr.Reset(fr, dropper{db.s, jf}, strict, checksum)
-				}
-
-				// Flush memdb and remove obsolete journal file.
-				if of != nil {
-					if mdb.Len() > 0 {
-						if _, err := db.s.flushMemdb(rec, mdb, -1); err != nil {
-							return err
-						}
-					}
-					rec.setJournalNum(jf.Num())
-					rec.setSeqNum(db.seq)
-					if err := db.s.commit(rec); err != nil {
-						return err
-					}
-					rec.resetAddedTables()
-					of.Remove()
-					of = nil
-				}
-
-				// Replay journal to memdb.
-				mdb.Reset()
-				for {
-					r, err := jr.Next()
-					if err != nil {
-						if err == io.EOF {
-							break
-						}
-						return errors.SetFile(err, jf)
-					}
-
-					buf.Reset()
-					if _, err := buf.ReadFrom(r); err != nil {
-						if err == io.ErrUnexpectedEOF {
-							// This is error returned due to corruption, with strict == false.
-							continue
-						} else {
-							return errors.SetFile(err, jf)
-						}
-					}
-					if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
-						if strict || !errors.IsCorrupted(err) {
-							return errors.SetFile(err, jf)
-						} else {
-							db.s.logf("journal error: %v (skipped)", err)
-							// We won't apply sequence number as it might be corrupted.
-							continue
-						}
-					}
-
-					// Save sequence number.
-					db.seq = batch.seq + uint64(batch.Len())
-
-					// Flush it if large enough.
-					if mdb.Size() >= writeBuffer {
-						if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
-							return err
-						}
-						mdb.Reset()
-					}
-				}
-
-				of = jf
-				return nil
-			}()
+			fr, err := jf.Open()
 			if err != nil {
 				return err
 			}
+
+			// Create or reset journal reader instance.
+			if jr == nil {
+				jr = journal.NewReader(fr, dropper{db.s, jf}, strict, checksum)
+			} else {
+				jr.Reset(fr, dropper{db.s, jf}, strict, checksum)
+			}
+
+			// Flush memdb and remove obsolete journal file.
+			if of != nil {
+				if mdb.Len() > 0 {
+					if _, err := db.s.flushMemdb(rec, mdb, -1); err != nil {
+						fr.Close()
+						return err
+					}
+				}
+
+				rec.setJournalNum(jf.Num())
+				rec.setSeqNum(db.seq)
+				if err := db.s.commit(rec); err != nil {
+					fr.Close()
+					return err
+				}
+				rec.resetAddedTables()
+
+				of.Remove()
+				of = nil
+			}
+
+			// Replay journal to memdb.
+			mdb.Reset()
+			for {
+				r, err := jr.Next()
+				if err != nil {
+					if err == io.EOF {
+						break
+					}
+
+					fr.Close()
+					return errors.SetFile(err, jf)
+				}
+
+				buf.Reset()
+				if _, err := buf.ReadFrom(r); err != nil {
+					if err == io.ErrUnexpectedEOF {
+						// This is error returned due to corruption, with strict == false.
+						continue
+					}
+
+					fr.Close()
+					return errors.SetFile(err, jf)
+				}
+				if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
+					if !strict && errors.IsCorrupted(err) {
+						db.s.logf("journal error: %v (skipped)", err)
+						// We won't apply sequence number as it might be corrupted.
+						continue
+					}
+
+					fr.Close()
+					return errors.SetFile(err, jf)
+				}
+
+				// Save sequence number.
+				db.seq = batch.seq + uint64(batch.Len())
+
+				// Flush it if large enough.
+				if mdb.Size() >= writeBuffer {
+					if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
+						fr.Close()
+						return err
+					}
+
+					mdb.Reset()
+				}
+			}
+
+			fr.Close()
+			of = jf
 		}
 
 		// Flush the last memdb.
@@ -606,6 +627,103 @@
 	return nil
 }
 
+func (db *DB) recoverJournalRO() error {
+	// Get all journals and sort it by file number.
+	allJournalFiles, err := db.s.getFiles(storage.TypeJournal)
+	if err != nil {
+		return err
+	}
+	files(allJournalFiles).sort()
+
+	// Journals that will be recovered.
+	var recJournalFiles []storage.File
+	for _, jf := range allJournalFiles {
+		if jf.Num() >= db.s.stJournalNum || jf.Num() == db.s.stPrevJournalNum {
+			recJournalFiles = append(recJournalFiles, jf)
+		}
+	}
+
+	var (
+		// Options.
+		strict      = db.s.o.GetStrict(opt.StrictJournal)
+		checksum    = db.s.o.GetStrict(opt.StrictJournalChecksum)
+		writeBuffer = db.s.o.GetWriteBuffer()
+
+		mdb = memdb.New(db.s.icmp, writeBuffer)
+	)
+
+	// Recover journals.
+	if len(recJournalFiles) > 0 {
+		db.logf("journal@recovery RO·Mode F·%d", len(recJournalFiles))
+
+		var (
+			jr    *journal.Reader
+			buf   = &util.Buffer{}
+			batch = &Batch{}
+		)
+
+		for _, jf := range recJournalFiles {
+			db.logf("journal@recovery recovering @%d", jf.Num())
+
+			fr, err := jf.Open()
+			if err != nil {
+				return err
+			}
+
+			// Create or reset journal reader instance.
+			if jr == nil {
+				jr = journal.NewReader(fr, dropper{db.s, jf}, strict, checksum)
+			} else {
+				jr.Reset(fr, dropper{db.s, jf}, strict, checksum)
+			}
+
+			// Replay journal to memdb.
+			for {
+				r, err := jr.Next()
+				if err != nil {
+					if err == io.EOF {
+						break
+					}
+
+					fr.Close()
+					return errors.SetFile(err, jf)
+				}
+
+				buf.Reset()
+				if _, err := buf.ReadFrom(r); err != nil {
+					if err == io.ErrUnexpectedEOF {
+						// This is error returned due to corruption, with strict == false.
+						continue
+					}
+
+					fr.Close()
+					return errors.SetFile(err, jf)
+				}
+				if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
+					if !strict && errors.IsCorrupted(err) {
+						db.s.logf("journal error: %v (skipped)", err)
+						// We won't apply sequence number as it might be corrupted.
+						continue
+					}
+
+					fr.Close()
+					return errors.SetFile(err, jf)
+				}
+
+				// Save sequence number.
+				db.seq = batch.seq + uint64(batch.Len())
+			}
+
+			fr.Close()
+		}
+	}
+
+	// Set memDB.
+	db.mem = &memDB{db: db, DB: mdb, ref: 1}
+
+	return nil
+}
+
 func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
 	ikey := newIkey(key, seq, ktSeek)
 
@@ -902,6 +1020,9 @@
 	var err error
 	select {
 	case err = <-db.compErrC:
+		if err == ErrReadOnly {
+			err = nil
+		}
 	default:
 	}
 
diff --git a/leveldb/db_compaction.go b/leveldb/db_compaction.go
index 3b4c469..2600310 100644
--- a/leveldb/db_compaction.go
+++ b/leveldb/db_compaction.go
@@ -62,10 +62,7 @@
 }
 
 func (db *DB) compactionError() {
-	var (
-		err     error
-		wlocked bool
-	)
+	var err error
 noerr:
 	// No error.
 	for {
@@ -73,7 +70,7 @@
 		case err = <-db.compErrSetC:
 			switch {
 			case err == nil:
-			case errors.IsCorrupted(err):
+			case err == ErrReadOnly, errors.IsCorrupted(err):
 				goto hasperr
 			default:
 				goto haserr
@@ -91,7 +88,7 @@
 			switch {
 			case err == nil:
 				goto noerr
-			case errors.IsCorrupted(err):
+			case err == ErrReadOnly, errors.IsCorrupted(err):
 				goto hasperr
 			default:
 			}
@@ -107,9 +104,9 @@
 		case db.compPerErrC <- err:
 		case db.writeLockC <- struct{}{}:
 			// Hold write lock, so that write won't pass-through.
-			wlocked = true
+			db.compWriteLocking = true
 		case _, _ = <-db.closeC:
-			if wlocked {
+			if db.compWriteLocking {
 				// We should release the lock or Close will hang.
 				<-db.writeLockC
 			}
diff --git a/leveldb/db_test.go b/leveldb/db_test.go
index 2f60d17..9d91ebf 100644
--- a/leveldb/db_test.go
+++ b/leveldb/db_test.go
@@ -2663,3 +2663,39 @@
 func TestDB_IterTriggeredCompactionHalf(t *testing.T) {
 	testDB_IterTriggeredCompaction(t, 2)
 }
+
+func TestDB_ReadOnly(t *testing.T) {
+	h := newDbHarness(t)
+	defer h.close()
+
+	h.put("foo", "v1")
+	h.put("bar", "v2")
+	h.compactMem()
+
+	h.put("xfoo", "v1")
+	h.put("xbar", "v2")
+
+	t.Log("Trigger read-only")
+	if err := h.db.SetReadOnly(); err != nil {
+		h.close()
+		t.Fatalf("SetReadOnly error: %v", err)
+	}
+
+	h.stor.SetEmuErr(storage.TypeAll, tsOpCreate, tsOpReplace, tsOpRemove, tsOpWrite, tsOpWrite, tsOpSync)
+
+	ro := func(key, value, wantValue string) {
+		if err := h.db.Put([]byte(key), []byte(value), h.wo); err != ErrReadOnly {
+			t.Fatalf("unexpected error: %v", err)
+		}
+		h.getVal(key, wantValue)
+	}
+
+	ro("foo", "vx", "v1")
+
+	h.o.ReadOnly = true
+	h.reopenDB()
+
+	ro("foo", "vx", "v1")
+	ro("bar", "vx", "v2")
+	h.assertNumKeys(4)
+}
diff --git a/leveldb/db_write.go b/leveldb/db_write.go
index 1e412b3..99ee85d 100644
--- a/leveldb/db_write.go
+++ b/leveldb/db_write.go
@@ -309,3 +309,31 @@
 	// Table compaction.
 	return db.compSendRange(db.tcompCmdC, -1, r.Start, r.Limit)
 }
+
+// SetReadOnly makes DB read-only. It will stay read-only until reopened.
+func (db *DB) SetReadOnly() error {
+	if err := db.ok(); err != nil {
+		return err
+	}
+
+	// Lock writer.
+	select {
+	case db.writeLockC <- struct{}{}:
+		db.compWriteLocking = true
+	case err := <-db.compPerErrC:
+		return err
+	case _, _ = <-db.closeC:
+		return ErrClosed
+	}
+
+	// Set compaction read-only.
+	select {
+	case db.compErrSetC <- ErrReadOnly:
+	case perr := <-db.compPerErrC:
+		return perr
+	case _, _ = <-db.closeC:
+		return ErrClosed
+	}
+
+	return nil
+}
diff --git a/leveldb/errors.go b/leveldb/errors.go
index 29d0d2f..c8bd66a 100644
--- a/leveldb/errors.go
+++ b/leveldb/errors.go
@@ -12,6 +12,7 @@
 
 var (
 	ErrNotFound         = errors.ErrNotFound
+	ErrReadOnly         = errors.New("leveldb: read-only mode")
 	ErrSnapshotReleased = errors.New("leveldb: snapshot released")
 	ErrIterReleased     = errors.New("leveldb: iterator released")
 	ErrClosed           = errors.New("leveldb: closed")
diff --git a/leveldb/opt/options.go b/leveldb/opt/options.go
index 4464c4f..f9a309d 100644
--- a/leveldb/opt/options.go
+++ b/leveldb/opt/options.go
@@ -326,6 +326,11 @@
 	// The default value is 500.
 	OpenFilesCacheCapacity int
 
+	// If true then opens DB in read-only mode.
+	//
+	// The default value is false.
+	ReadOnly bool
+
 	// Strict defines the DB strict level.
 	Strict Strict
 
@@ -567,6 +572,13 @@
 	return o.OpenFilesCacheCapacity
 }
 
+func (o *Options) GetReadOnly() bool {
+	if o == nil {
+		return false
+	}
+	return o.ReadOnly
+}
+
 func (o *Options) GetStrict(strict Strict) bool {
 	if o == nil || o.Strict == 0 {
 		return DefaultStrict&strict != 0
diff --git a/leveldb/storage_test.go b/leveldb/storage_test.go
index dc1f1fb..08be0ba 100644
--- a/leveldb/storage_test.go
+++ b/leveldb/storage_test.go
@@ -42,6 +42,8 @@
 const (
 	tsOpOpen tsOp = iota
 	tsOpCreate
+	tsOpReplace
+	tsOpRemove
 	tsOpRead
 	tsOpReadAt
 	tsOpWrite
@@ -241,6 +243,10 @@
 	if err != nil {
 		return
 	}
+	if tf.shouldErr(tsOpReplace) {
+		err = errors.New("leveldb.testStorage: emulated create error")
+		return
+	}
 	err = tf.File.Replace(newfile.(tsFile).File)
 	if err != nil {
 		ts.t.Errorf("E: cannot replace file, num=%d type=%v: %v", tf.Num(), tf.Type(), err)
@@ -258,6 +264,10 @@
 	if err != nil {
 		return
 	}
+	if tf.shouldErr(tsOpRemove) {
+		err = errors.New("leveldb.testStorage: emulated create error")
+		return
+	}
 	err = tf.File.Remove()
 	if err != nil {
 		ts.t.Errorf("E: cannot remove file, num=%d type=%v: %v", tf.Num(), tf.Type(), err)