| // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> |
| // All rights reserved. |
| // |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| package leveldb |
| |
| import ( |
| "time" |
| |
| "github.com/syndtr/goleveldb/leveldb/memdb" |
| "github.com/syndtr/goleveldb/leveldb/opt" |
| "github.com/syndtr/goleveldb/leveldb/util" |
| ) |
| |
| func (db *DB) writeJournal(b *Batch) error { |
| w, err := db.journal.Next() |
| if err != nil { |
| return err |
| } |
| if _, err := w.Write(b.encode()); err != nil { |
| return err |
| } |
| if err := db.journal.Flush(); err != nil { |
| return err |
| } |
| if b.sync { |
| return db.journalWriter.Sync() |
| } |
| return nil |
| } |
| |
| func (db *DB) jWriter() { |
| defer db.closeW.Done() |
| for { |
| select { |
| case b := <-db.journalC: |
| if b != nil { |
| db.journalAckC <- db.writeJournal(b) |
| } |
| case _, _ = <-db.closeC: |
| return |
| } |
| } |
| } |
| |
| func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) { |
| // Wait for pending memdb compaction. |
| err = db.compTriggerWait(db.mcompCmdC) |
| if err != nil { |
| return |
| } |
| |
| // Create new memdb and journal. |
| mem, err = db.newMem(n) |
| if err != nil { |
| return |
| } |
| |
| // Schedule memdb compaction. |
| if wait { |
| err = db.compTriggerWait(db.mcompCmdC) |
| } else { |
| db.compTrigger(db.mcompCmdC) |
| } |
| return |
| } |
| |
| func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) { |
| delayed := false |
| flush := func() (retry bool) { |
| v := db.s.version() |
| defer v.release() |
| mdb = db.getEffectiveMem() |
| defer func() { |
| if retry { |
| mdb.decref() |
| mdb = nil |
| } |
| }() |
| mdbFree = mdb.Free() |
| switch { |
| case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed: |
| delayed = true |
| time.Sleep(time.Millisecond) |
| case mdbFree >= n: |
| return false |
| case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger(): |
| delayed = true |
| err = db.compTriggerWait(db.tcompCmdC) |
| if err != nil { |
| return false |
| } |
| default: |
| // Allow memdb to grow if it has no entry. |
| if mdb.Len() == 0 { |
| mdbFree = n |
| } else { |
| mdb.decref() |
| mdb, err = db.rotateMem(n, false) |
| if err == nil { |
| mdbFree = mdb.Free() |
| } else { |
| mdbFree = 0 |
| } |
| } |
| return false |
| } |
| return true |
| } |
| start := time.Now() |
| for flush() { |
| } |
| if delayed { |
| db.writeDelay += time.Since(start) |
| db.writeDelayN++ |
| } else if db.writeDelayN > 0 { |
| db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay) |
| db.writeDelay = 0 |
| db.writeDelayN = 0 |
| } |
| return |
| } |
| |
| // Write apply the given batch to the DB. The batch will be applied |
| // sequentially. |
| // |
| // It is safe to modify the contents of the arguments after Write returns. |
| func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) { |
| err = db.ok() |
| if err != nil || b == nil || b.Len() == 0 { |
| return |
| } |
| |
| b.init(wo.GetSync() && !db.s.o.GetNoSync()) |
| |
| if b.size() > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() { |
| // Writes using transaction. |
| tr, err1 := db.OpenTransaction() |
| if err1 != nil { |
| return err1 |
| } |
| if err1 := tr.Write(b, wo); err1 != nil { |
| tr.Discard() |
| return err1 |
| } |
| return tr.Commit() |
| } |
| |
| // The write happen synchronously. |
| select { |
| case db.writeC <- b: |
| if <-db.writeMergedC { |
| return <-db.writeAckC |
| } |
| // Continue, the write lock already acquired by previous writer |
| // and handed out to us. |
| case db.writeLockC <- struct{}{}: |
| case err = <-db.compPerErrC: |
| return |
| case _, _ = <-db.closeC: |
| return ErrClosed |
| } |
| |
| merged := 0 |
| danglingMerge := false |
| defer func() { |
| for i := 0; i < merged; i++ { |
| db.writeAckC <- err |
| } |
| if danglingMerge { |
| // Only one dangling merge at most, so this is safe. |
| db.writeMergedC <- false |
| } else { |
| <-db.writeLockC |
| } |
| }() |
| |
| mdb, mdbFree, err := db.flush(b.size()) |
| if err != nil { |
| return |
| } |
| defer mdb.decref() |
| |
| // Calculate maximum size of the batch. |
| m := 1 << 20 |
| if x := b.size(); x <= 128<<10 { |
| m = x + (128 << 10) |
| } |
| m = minInt(m, mdbFree) |
| |
| // Merge with other batch. |
| drain: |
| for b.size() < m && !b.sync { |
| select { |
| case nb := <-db.writeC: |
| if b.size()+nb.size() <= m { |
| b.append(nb) |
| db.writeMergedC <- true |
| merged++ |
| } else { |
| danglingMerge = true |
| break drain |
| } |
| default: |
| break drain |
| } |
| } |
| |
| // Set batch first seq number relative from last seq. |
| b.seq = db.seq + 1 |
| |
| // Write journal concurrently if it is large enough. |
| if b.size() >= (128 << 10) { |
| // Push the write batch to the journal writer |
| select { |
| case db.journalC <- b: |
| // Write into memdb |
| if berr := b.memReplay(mdb.DB); berr != nil { |
| panic(berr) |
| } |
| case err = <-db.compPerErrC: |
| return |
| case _, _ = <-db.closeC: |
| err = ErrClosed |
| return |
| } |
| // Wait for journal writer |
| select { |
| case err = <-db.journalAckC: |
| if err != nil { |
| // Revert memdb if error detected |
| if berr := b.revertMemReplay(mdb.DB); berr != nil { |
| panic(berr) |
| } |
| return |
| } |
| case _, _ = <-db.closeC: |
| err = ErrClosed |
| return |
| } |
| } else { |
| err = db.writeJournal(b) |
| if err != nil { |
| return |
| } |
| if berr := b.memReplay(mdb.DB); berr != nil { |
| panic(berr) |
| } |
| } |
| |
| // Set last seq number. |
| db.addSeq(uint64(b.Len())) |
| |
| if b.size() >= mdbFree { |
| db.rotateMem(0, false) |
| } |
| return |
| } |
| |
| // Put sets the value for the given key. It overwrites any previous value |
| // for that key; a DB is not a multi-map. |
| // |
| // It is safe to modify the contents of the arguments after Put returns. |
| func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error { |
| b := new(Batch) |
| b.Put(key, value) |
| return db.Write(b, wo) |
| } |
| |
| // Delete deletes the value for the given key. |
| // |
| // It is safe to modify the contents of the arguments after Delete returns. |
| func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error { |
| b := new(Batch) |
| b.Delete(key) |
| return db.Write(b, wo) |
| } |
| |
| func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool { |
| iter := mem.NewIterator(nil) |
| defer iter.Release() |
| return (max == nil || (iter.First() && icmp.uCompare(max, internalKey(iter.Key()).ukey()) >= 0)) && |
| (min == nil || (iter.Last() && icmp.uCompare(min, internalKey(iter.Key()).ukey()) <= 0)) |
| } |
| |
| // CompactRange compacts the underlying DB for the given key range. |
| // In particular, deleted and overwritten versions are discarded, |
| // and the data is rearranged to reduce the cost of operations |
| // needed to access the data. This operation should typically only |
| // be invoked by users who understand the underlying implementation. |
| // |
| // A nil Range.Start is treated as a key before all keys in the DB. |
| // And a nil Range.Limit is treated as a key after all keys in the DB. |
| // Therefore if both is nil then it will compact entire DB. |
| func (db *DB) CompactRange(r util.Range) error { |
| if err := db.ok(); err != nil { |
| return err |
| } |
| |
| // Lock writer. |
| select { |
| case db.writeLockC <- struct{}{}: |
| case err := <-db.compPerErrC: |
| return err |
| case _, _ = <-db.closeC: |
| return ErrClosed |
| } |
| |
| // Check for overlaps in memdb. |
| mdb := db.getEffectiveMem() |
| defer mdb.decref() |
| if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) { |
| // Memdb compaction. |
| if _, err := db.rotateMem(0, false); err != nil { |
| <-db.writeLockC |
| return err |
| } |
| <-db.writeLockC |
| if err := db.compTriggerWait(db.mcompCmdC); err != nil { |
| return err |
| } |
| } else { |
| <-db.writeLockC |
| } |
| |
| // Table compaction. |
| return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit) |
| } |
| |
| // SetReadOnly makes DB read-only. It will stay read-only until reopened. |
| func (db *DB) SetReadOnly() error { |
| if err := db.ok(); err != nil { |
| return err |
| } |
| |
| // Lock writer. |
| select { |
| case db.writeLockC <- struct{}{}: |
| db.compWriteLocking = true |
| case err := <-db.compPerErrC: |
| return err |
| case _, _ = <-db.closeC: |
| return ErrClosed |
| } |
| |
| // Set compaction read-only. |
| select { |
| case db.compErrSetC <- ErrReadOnly: |
| case perr := <-db.compPerErrC: |
| return perr |
| case _, _ = <-db.closeC: |
| return ErrClosed |
| } |
| |
| return nil |
| } |