| // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> |
| // All rights reserved. |
| // |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| package leveldb |
| |
| import ( |
| "time" |
| |
| "github.com/syndtr/goleveldb/leveldb/memdb" |
| "github.com/syndtr/goleveldb/leveldb/opt" |
| "github.com/syndtr/goleveldb/leveldb/util" |
| ) |
| |
| func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error { |
| wr, err := db.journal.Next() |
| if err != nil { |
| return err |
| } |
| if err := writeBatchesWithHeader(wr, batches, seq); err != nil { |
| return err |
| } |
| if err := db.journal.Flush(); err != nil { |
| return err |
| } |
| if sync { |
| return db.journalWriter.Sync() |
| } |
| return nil |
| } |
| |
| func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) { |
| // Wait for pending memdb compaction. |
| err = db.compTriggerWait(db.mcompCmdC) |
| if err != nil { |
| return |
| } |
| |
| // Create new memdb and journal. |
| mem, err = db.newMem(n) |
| if err != nil { |
| return |
| } |
| |
| // Schedule memdb compaction. |
| if wait { |
| err = db.compTriggerWait(db.mcompCmdC) |
| } else { |
| db.compTrigger(db.mcompCmdC) |
| } |
| return |
| } |
| |
| func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) { |
| delayed := false |
| flush := func() (retry bool) { |
| v := db.s.version() |
| defer v.release() |
| mdb = db.getEffectiveMem() |
| if mdb == nil { |
| err = ErrClosed |
| return false |
| } |
| defer func() { |
| if retry { |
| mdb.decref() |
| mdb = nil |
| } |
| }() |
| mdbFree = mdb.Free() |
| switch { |
| case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed: |
| delayed = true |
| time.Sleep(time.Millisecond) |
| case mdbFree >= n: |
| return false |
| case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger(): |
| delayed = true |
| err = db.compTriggerWait(db.tcompCmdC) |
| if err != nil { |
| return false |
| } |
| default: |
| // Allow memdb to grow if it has no entry. |
| if mdb.Len() == 0 { |
| mdbFree = n |
| } else { |
| mdb.decref() |
| mdb, err = db.rotateMem(n, false) |
| if err == nil { |
| mdbFree = mdb.Free() |
| } else { |
| mdbFree = 0 |
| } |
| } |
| return false |
| } |
| return true |
| } |
| start := time.Now() |
| for flush() { |
| } |
| if delayed { |
| db.writeDelay += time.Since(start) |
| db.writeDelayN++ |
| } else if db.writeDelayN > 0 { |
| db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay) |
| db.writeDelay = 0 |
| db.writeDelayN = 0 |
| } |
| return |
| } |
| |
| type writeMerge struct { |
| sync bool |
| batch *Batch |
| keyType keyType |
| key, value []byte |
| } |
| |
| func (db *DB) unlockWrite(overflow bool, merged int, err error) { |
| for i := 0; i < merged; i++ { |
| db.writeAckC <- err |
| } |
| if overflow { |
| // Pass lock to the next write (that failed to merge). |
| db.writeMergedC <- false |
| } else { |
| // Release lock. |
| <-db.writeLockC |
| } |
| } |
| |
| // ourBatch if defined should equal with batch. |
| func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error { |
| // Try to flush memdb. This method would also trying to throttle writes |
| // if it is too fast and compaction cannot catch-up. |
| mdb, mdbFree, err := db.flush(batch.internalLen) |
| if err != nil { |
| db.unlockWrite(false, 0, err) |
| return err |
| } |
| defer mdb.decref() |
| |
| var ( |
| overflow bool |
| merged int |
| batches = []*Batch{batch} |
| ) |
| |
| if merge { |
| // Merge limit. |
| var mergeLimit int |
| if batch.internalLen > 128<<10 { |
| mergeLimit = (1 << 20) - batch.internalLen |
| } else { |
| mergeLimit = 128 << 10 |
| } |
| mergeCap := mdbFree - batch.internalLen |
| if mergeLimit > mergeCap { |
| mergeLimit = mergeCap |
| } |
| |
| merge: |
| for mergeLimit > 0 { |
| select { |
| case incoming := <-db.writeMergeC: |
| if incoming.batch != nil { |
| // Merge batch. |
| if incoming.batch.internalLen > mergeLimit { |
| overflow = true |
| break merge |
| } |
| batches = append(batches, incoming.batch) |
| mergeLimit -= incoming.batch.internalLen |
| } else { |
| // Merge put. |
| internalLen := len(incoming.key) + len(incoming.value) + 8 |
| if internalLen > mergeLimit { |
| overflow = true |
| break merge |
| } |
| if ourBatch == nil { |
| ourBatch = db.batchPool.Get().(*Batch) |
| ourBatch.Reset() |
| batches = append(batches, ourBatch) |
| } |
| // We can use same batch since concurrent write doesn't |
| // guarantee write order. |
| ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value) |
| mergeLimit -= internalLen |
| } |
| sync = sync || incoming.sync |
| merged++ |
| db.writeMergedC <- true |
| |
| default: |
| break merge |
| } |
| } |
| } |
| |
| // Seq number. |
| seq := db.seq + 1 |
| |
| // Write journal. |
| if err := db.writeJournal(batches, seq, sync); err != nil { |
| db.unlockWrite(overflow, merged, err) |
| return err |
| } |
| |
| // Put batches. |
| for _, batch := range batches { |
| if err := batch.putMem(seq, mdb.DB); err != nil { |
| panic(err) |
| } |
| seq += uint64(batch.Len()) |
| } |
| |
| // Incr seq number. |
| db.addSeq(uint64(batchesLen(batches))) |
| |
| // Rotate memdb if it's reach the threshold. |
| if batch.internalLen >= mdbFree { |
| db.rotateMem(0, false) |
| } |
| |
| db.unlockWrite(overflow, merged, nil) |
| return nil |
| } |
| |
| // Write apply the given batch to the DB. The batch records will be applied |
| // sequentially. Write might be used concurrently, when used concurrently and |
| // batch is small enough, write will try to merge the batches. Set NoWriteMerge |
| // option to true to disable write merge. |
| // |
| // It is safe to modify the contents of the arguments after Write returns but |
| // not before. Write will not modify content of the batch. |
| func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error { |
| if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 { |
| return err |
| } |
| |
| // If the batch size is larger than write buffer, it may justified to write |
| // using transaction instead. Using transaction the batch will be written |
| // into tables directly, skipping the journaling. |
| if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() { |
| tr, err := db.OpenTransaction() |
| if err != nil { |
| return err |
| } |
| if err := tr.Write(batch, wo); err != nil { |
| tr.Discard() |
| return err |
| } |
| return tr.Commit() |
| } |
| |
| merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge() |
| sync := wo.GetSync() && !db.s.o.GetNoSync() |
| |
| // Acquire write lock. |
| if merge { |
| select { |
| case db.writeMergeC <- writeMerge{sync: sync, batch: batch}: |
| if <-db.writeMergedC { |
| // Write is merged. |
| return <-db.writeAckC |
| } |
| // Write is not merged, the write lock is handed to us. Continue. |
| case db.writeLockC <- struct{}{}: |
| // Write lock acquired. |
| case err := <-db.compPerErrC: |
| // Compaction error. |
| return err |
| case <-db.closeC: |
| // Closed |
| return ErrClosed |
| } |
| } else { |
| select { |
| case db.writeLockC <- struct{}{}: |
| // Write lock acquired. |
| case err := <-db.compPerErrC: |
| // Compaction error. |
| return err |
| case <-db.closeC: |
| // Closed |
| return ErrClosed |
| } |
| } |
| |
| return db.writeLocked(batch, nil, merge, sync) |
| } |
| |
| func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error { |
| if err := db.ok(); err != nil { |
| return err |
| } |
| |
| merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge() |
| sync := wo.GetSync() && !db.s.o.GetNoSync() |
| |
| // Acquire write lock. |
| if merge { |
| select { |
| case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}: |
| if <-db.writeMergedC { |
| // Write is merged. |
| return <-db.writeAckC |
| } |
| // Write is not merged, the write lock is handed to us. Continue. |
| case db.writeLockC <- struct{}{}: |
| // Write lock acquired. |
| case err := <-db.compPerErrC: |
| // Compaction error. |
| return err |
| case <-db.closeC: |
| // Closed |
| return ErrClosed |
| } |
| } else { |
| select { |
| case db.writeLockC <- struct{}{}: |
| // Write lock acquired. |
| case err := <-db.compPerErrC: |
| // Compaction error. |
| return err |
| case <-db.closeC: |
| // Closed |
| return ErrClosed |
| } |
| } |
| |
| batch := db.batchPool.Get().(*Batch) |
| batch.Reset() |
| batch.appendRec(kt, key, value) |
| return db.writeLocked(batch, batch, merge, sync) |
| } |
| |
| // Put sets the value for the given key. It overwrites any previous value |
| // for that key; a DB is not a multi-map. Write merge also applies for Put, see |
| // Write. |
| // |
| // It is safe to modify the contents of the arguments after Put returns but not |
| // before. |
| func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error { |
| return db.putRec(keyTypeVal, key, value, wo) |
| } |
| |
| // Delete deletes the value for the given key. Delete will not returns error if |
| // key doesn't exist. Write merge also applies for Delete, see Write. |
| // |
| // It is safe to modify the contents of the arguments after Delete returns but |
| // not before. |
| func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error { |
| return db.putRec(keyTypeDel, key, nil, wo) |
| } |
| |
| func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool { |
| iter := mem.NewIterator(nil) |
| defer iter.Release() |
| return (max == nil || (iter.First() && icmp.uCompare(max, internalKey(iter.Key()).ukey()) >= 0)) && |
| (min == nil || (iter.Last() && icmp.uCompare(min, internalKey(iter.Key()).ukey()) <= 0)) |
| } |
| |
| // CompactRange compacts the underlying DB for the given key range. |
| // In particular, deleted and overwritten versions are discarded, |
| // and the data is rearranged to reduce the cost of operations |
| // needed to access the data. This operation should typically only |
| // be invoked by users who understand the underlying implementation. |
| // |
| // A nil Range.Start is treated as a key before all keys in the DB. |
| // And a nil Range.Limit is treated as a key after all keys in the DB. |
| // Therefore if both is nil then it will compact entire DB. |
| func (db *DB) CompactRange(r util.Range) error { |
| if err := db.ok(); err != nil { |
| return err |
| } |
| |
| // Lock writer. |
| select { |
| case db.writeLockC <- struct{}{}: |
| case err := <-db.compPerErrC: |
| return err |
| case _, _ = <-db.closeC: |
| return ErrClosed |
| } |
| |
| // Check for overlaps in memdb. |
| mdb := db.getEffectiveMem() |
| if mdb == nil { |
| return ErrClosed |
| } |
| defer mdb.decref() |
| if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) { |
| // Memdb compaction. |
| if _, err := db.rotateMem(0, false); err != nil { |
| <-db.writeLockC |
| return err |
| } |
| <-db.writeLockC |
| if err := db.compTriggerWait(db.mcompCmdC); err != nil { |
| return err |
| } |
| } else { |
| <-db.writeLockC |
| } |
| |
| // Table compaction. |
| return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit) |
| } |
| |
| // SetReadOnly makes DB read-only. It will stay read-only until reopened. |
| func (db *DB) SetReadOnly() error { |
| if err := db.ok(); err != nil { |
| return err |
| } |
| |
| // Lock writer. |
| select { |
| case db.writeLockC <- struct{}{}: |
| db.compWriteLocking = true |
| case err := <-db.compPerErrC: |
| return err |
| case _, _ = <-db.closeC: |
| return ErrClosed |
| } |
| |
| // Set compaction read-only. |
| select { |
| case db.compErrSetC <- ErrReadOnly: |
| case perr := <-db.compPerErrC: |
| return perr |
| case _, _ = <-db.closeC: |
| return ErrClosed |
| } |
| |
| return nil |
| } |