| // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> |
| // All rights reserved. |
| // |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| package leveldb |
| |
| import ( |
| "encoding/binary" |
| "fmt" |
| "io" |
| |
| "github.com/syndtr/goleveldb/leveldb/errors" |
| "github.com/syndtr/goleveldb/leveldb/memdb" |
| "github.com/syndtr/goleveldb/leveldb/storage" |
| ) |
| |
| // ErrBatchCorrupted records reason of batch corruption. This error will be |
| // wrapped with errors.ErrCorrupted. |
| type ErrBatchCorrupted struct { |
| Reason string |
| } |
| |
| func (e *ErrBatchCorrupted) Error() string { |
| return fmt.Sprintf("leveldb: batch corrupted: %s", e.Reason) |
| } |
| |
| func newErrBatchCorrupted(reason string) error { |
| return errors.NewErrCorrupted(storage.FileDesc{}, &ErrBatchCorrupted{reason}) |
| } |
| |
| const ( |
| batchHeaderLen = 8 + 4 |
| batchGrowRec = 3000 |
| batchBufioSize = 16 |
| ) |
| |
| // BatchReplay wraps basic batch operations. |
| type BatchReplay interface { |
| Put(key, value []byte) |
| Delete(key []byte) |
| } |
| |
| type batchIndex struct { |
| keyType keyType |
| keyPos, keyLen int |
| valuePos, valueLen int |
| } |
| |
| func (index batchIndex) k(data []byte) []byte { |
| return data[index.keyPos : index.keyPos+index.keyLen] |
| } |
| |
| func (index batchIndex) v(data []byte) []byte { |
| if index.valueLen != 0 { |
| return data[index.valuePos : index.valuePos+index.valueLen] |
| } |
| return nil |
| } |
| |
| func (index batchIndex) kv(data []byte) (key, value []byte) { |
| return index.k(data), index.v(data) |
| } |
| |
| // Batch is a write batch. |
| type Batch struct { |
| data []byte |
| index []batchIndex |
| |
| // internalLen is sums of key/value pair length plus 8-bytes internal key. |
| internalLen int |
| } |
| |
| func (b *Batch) grow(n int) { |
| o := len(b.data) |
| if cap(b.data)-o < n { |
| div := 1 |
| if len(b.index) > batchGrowRec { |
| div = len(b.index) / batchGrowRec |
| } |
| ndata := make([]byte, o, o+n+o/div) |
| copy(ndata, b.data) |
| b.data = ndata |
| } |
| } |
| |
| func (b *Batch) appendRec(kt keyType, key, value []byte) { |
| n := 1 + binary.MaxVarintLen32 + len(key) |
| if kt == keyTypeVal { |
| n += binary.MaxVarintLen32 + len(value) |
| } |
| b.grow(n) |
| index := batchIndex{keyType: kt} |
| o := len(b.data) |
| data := b.data[:o+n] |
| data[o] = byte(kt) |
| o++ |
| o += binary.PutUvarint(data[o:], uint64(len(key))) |
| index.keyPos = o |
| index.keyLen = len(key) |
| o += copy(data[o:], key) |
| if kt == keyTypeVal { |
| o += binary.PutUvarint(data[o:], uint64(len(value))) |
| index.valuePos = o |
| index.valueLen = len(value) |
| o += copy(data[o:], value) |
| } |
| b.data = data[:o] |
| b.index = append(b.index, index) |
| b.internalLen += index.keyLen + index.valueLen + 8 |
| } |
| |
| // Put appends 'put operation' of the given key/value pair to the batch. |
| // It is safe to modify the contents of the argument after Put returns but not |
| // before. |
| func (b *Batch) Put(key, value []byte) { |
| b.appendRec(keyTypeVal, key, value) |
| } |
| |
| // Delete appends 'delete operation' of the given key to the batch. |
| // It is safe to modify the contents of the argument after Delete returns but |
| // not before. |
| func (b *Batch) Delete(key []byte) { |
| b.appendRec(keyTypeDel, key, nil) |
| } |
| |
| // Dump dumps batch contents. The returned slice can be loaded into the |
| // batch using Load method. |
| // The returned slice is not its own copy, so the contents should not be |
| // modified. |
| func (b *Batch) Dump() []byte { |
| return b.data |
| } |
| |
| // Load loads given slice into the batch. Previous contents of the batch |
| // will be discarded. |
| // The given slice will not be copied and will be used as batch buffer, so |
| // it is not safe to modify the contents of the slice. |
| func (b *Batch) Load(data []byte) error { |
| return b.decode(data, -1) |
| } |
| |
| // Replay replays batch contents. |
| func (b *Batch) Replay(r BatchReplay) error { |
| for _, index := range b.index { |
| switch index.keyType { |
| case keyTypeVal: |
| r.Put(index.k(b.data), index.v(b.data)) |
| case keyTypeDel: |
| r.Delete(index.k(b.data)) |
| } |
| } |
| return nil |
| } |
| |
| // Len returns number of records in the batch. |
| func (b *Batch) Len() int { |
| return len(b.index) |
| } |
| |
| // Reset resets the batch. |
| func (b *Batch) Reset() { |
| b.data = b.data[:0] |
| b.index = b.index[:0] |
| b.internalLen = 0 |
| } |
| |
| func (b *Batch) replayInternal(fn func(i int, kt keyType, k, v []byte) error) error { |
| for i, index := range b.index { |
| if err := fn(i, index.keyType, index.k(b.data), index.v(b.data)); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| func (b *Batch) append(p *Batch) { |
| ob := len(b.data) |
| oi := len(b.index) |
| b.data = append(b.data, p.data...) |
| b.index = append(b.index, p.index...) |
| b.internalLen += p.internalLen |
| |
| // Updating index offset. |
| if ob != 0 { |
| for ; oi < len(b.index); oi++ { |
| index := &b.index[oi] |
| index.keyPos += ob |
| if index.valueLen != 0 { |
| index.valuePos += ob |
| } |
| } |
| } |
| } |
| |
| func (b *Batch) decode(data []byte, expectedLen int) error { |
| b.data = data |
| b.index = b.index[:0] |
| b.internalLen = 0 |
| err := decodeBatch(data, func(i int, index batchIndex) error { |
| b.index = append(b.index, index) |
| b.internalLen += index.keyLen + index.valueLen + 8 |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| if expectedLen >= 0 && len(b.index) != expectedLen { |
| return newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", expectedLen, len(b.index))) |
| } |
| return nil |
| } |
| |
| func (b *Batch) putMem(seq uint64, mdb *memdb.DB) error { |
| var ik []byte |
| for i, index := range b.index { |
| ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType) |
| if err := mdb.Put(ik, index.v(b.data)); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| func (b *Batch) revertMem(seq uint64, mdb *memdb.DB) error { |
| var ik []byte |
| for i, index := range b.index { |
| ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType) |
| if err := mdb.Delete(ik); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| func newBatch() interface{} { |
| return &Batch{} |
| } |
| |
| func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error { |
| var index batchIndex |
| for i, o := 0, 0; o < len(data); i++ { |
| // Key type. |
| index.keyType = keyType(data[o]) |
| if index.keyType > keyTypeVal { |
| return newErrBatchCorrupted(fmt.Sprintf("bad record: invalid type %#x", uint(index.keyType))) |
| } |
| o++ |
| |
| // Key. |
| x, n := binary.Uvarint(data[o:]) |
| o += n |
| if n <= 0 || o+int(x) > len(data) { |
| return newErrBatchCorrupted("bad record: invalid key length") |
| } |
| index.keyPos = o |
| index.keyLen = int(x) |
| o += index.keyLen |
| |
| // Value. |
| if index.keyType == keyTypeVal { |
| x, n = binary.Uvarint(data[o:]) |
| o += n |
| if n <= 0 || o+int(x) > len(data) { |
| return newErrBatchCorrupted("bad record: invalid value length") |
| } |
| index.valuePos = o |
| index.valueLen = int(x) |
| o += index.valueLen |
| } else { |
| index.valuePos = 0 |
| index.valueLen = 0 |
| } |
| |
| if err := fn(i, index); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| func decodeBatchToMem(data []byte, expectSeq uint64, mdb *memdb.DB) (seq uint64, batchLen int, err error) { |
| seq, batchLen, err = decodeBatchHeader(data) |
| if err != nil { |
| return 0, 0, err |
| } |
| if seq < expectSeq { |
| return 0, 0, newErrBatchCorrupted("invalid sequence number") |
| } |
| data = data[batchHeaderLen:] |
| var ik []byte |
| var decodedLen int |
| err = decodeBatch(data, func(i int, index batchIndex) error { |
| if i >= batchLen { |
| return newErrBatchCorrupted("invalid records length") |
| } |
| ik = makeInternalKey(ik, index.k(data), seq+uint64(i), index.keyType) |
| if err := mdb.Put(ik, index.v(data)); err != nil { |
| return err |
| } |
| decodedLen++ |
| return nil |
| }) |
| if err == nil && decodedLen != batchLen { |
| err = newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", batchLen, decodedLen)) |
| } |
| return |
| } |
| |
| func encodeBatchHeader(dst []byte, seq uint64, batchLen int) []byte { |
| dst = ensureBuffer(dst, batchHeaderLen) |
| binary.LittleEndian.PutUint64(dst, seq) |
| binary.LittleEndian.PutUint32(dst[8:], uint32(batchLen)) |
| return dst |
| } |
| |
| func decodeBatchHeader(data []byte) (seq uint64, batchLen int, err error) { |
| if len(data) < batchHeaderLen { |
| return 0, 0, newErrBatchCorrupted("too short") |
| } |
| |
| seq = binary.LittleEndian.Uint64(data) |
| batchLen = int(binary.LittleEndian.Uint32(data[8:])) |
| if batchLen < 0 { |
| return 0, 0, newErrBatchCorrupted("invalid records length") |
| } |
| return |
| } |
| |
| func batchesLen(batches []*Batch) int { |
| batchLen := 0 |
| for _, batch := range batches { |
| batchLen += batch.Len() |
| } |
| return batchLen |
| } |
| |
| func writeBatchesWithHeader(wr io.Writer, batches []*Batch, seq uint64) error { |
| if _, err := wr.Write(encodeBatchHeader(nil, seq, batchesLen(batches))); err != nil { |
| return err |
| } |
| for _, batch := range batches { |
| if _, err := wr.Write(batch.data); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |