| // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> |
| // All rights reserved. |
| // |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| package table |
| |
| import ( |
| "encoding/binary" |
| "errors" |
| "fmt" |
| "io" |
| |
| "github.com/golang/snappy" |
| |
| "github.com/syndtr/goleveldb/leveldb/comparer" |
| "github.com/syndtr/goleveldb/leveldb/filter" |
| "github.com/syndtr/goleveldb/leveldb/opt" |
| "github.com/syndtr/goleveldb/leveldb/util" |
| ) |
| |
| func sharedPrefixLen(a, b []byte) int { |
| i, n := 0, len(a) |
| if n > len(b) { |
| n = len(b) |
| } |
| for i < n && a[i] == b[i] { |
| i++ |
| } |
| return i |
| } |
| |
| type blockWriter struct { |
| restartInterval int |
| buf util.Buffer |
| nEntries int |
| prevKey []byte |
| restarts []uint32 |
| scratch []byte |
| } |
| |
| func (w *blockWriter) append(key, value []byte) { |
| nShared := 0 |
| if w.nEntries%w.restartInterval == 0 { |
| w.restarts = append(w.restarts, uint32(w.buf.Len())) |
| } else { |
| nShared = sharedPrefixLen(w.prevKey, key) |
| } |
| n := binary.PutUvarint(w.scratch[0:], uint64(nShared)) |
| n += binary.PutUvarint(w.scratch[n:], uint64(len(key)-nShared)) |
| n += binary.PutUvarint(w.scratch[n:], uint64(len(value))) |
| w.buf.Write(w.scratch[:n]) |
| w.buf.Write(key[nShared:]) |
| w.buf.Write(value) |
| w.prevKey = append(w.prevKey[:0], key...) |
| w.nEntries++ |
| } |
| |
| func (w *blockWriter) finish() { |
| // Write restarts entry. |
| if w.nEntries == 0 { |
| // Must have at least one restart entry. |
| w.restarts = append(w.restarts, 0) |
| } |
| w.restarts = append(w.restarts, uint32(len(w.restarts))) |
| for _, x := range w.restarts { |
| buf4 := w.buf.Alloc(4) |
| binary.LittleEndian.PutUint32(buf4, x) |
| } |
| } |
| |
| func (w *blockWriter) reset() { |
| w.buf.Reset() |
| w.nEntries = 0 |
| w.restarts = w.restarts[:0] |
| } |
| |
| func (w *blockWriter) bytesLen() int { |
| restartsLen := len(w.restarts) |
| if restartsLen == 0 { |
| restartsLen = 1 |
| } |
| return w.buf.Len() + 4*restartsLen + 4 |
| } |
| |
| type filterWriter struct { |
| generator filter.FilterGenerator |
| buf util.Buffer |
| nKeys int |
| offsets []uint32 |
| } |
| |
| func (w *filterWriter) add(key []byte) { |
| if w.generator == nil { |
| return |
| } |
| w.generator.Add(key) |
| w.nKeys++ |
| } |
| |
| func (w *filterWriter) flush(offset uint64) { |
| if w.generator == nil { |
| return |
| } |
| for x := int(offset / filterBase); x > len(w.offsets); { |
| w.generate() |
| } |
| } |
| |
| func (w *filterWriter) finish() { |
| if w.generator == nil { |
| return |
| } |
| // Generate last keys. |
| |
| if w.nKeys > 0 { |
| w.generate() |
| } |
| w.offsets = append(w.offsets, uint32(w.buf.Len())) |
| for _, x := range w.offsets { |
| buf4 := w.buf.Alloc(4) |
| binary.LittleEndian.PutUint32(buf4, x) |
| } |
| w.buf.WriteByte(filterBaseLg) |
| } |
| |
| func (w *filterWriter) generate() { |
| // Record offset. |
| w.offsets = append(w.offsets, uint32(w.buf.Len())) |
| // Generate filters. |
| if w.nKeys > 0 { |
| w.generator.Generate(&w.buf) |
| w.nKeys = 0 |
| } |
| } |
| |
| // Writer is a table writer. |
| type Writer struct { |
| writer io.Writer |
| err error |
| // Options |
| cmp comparer.Comparer |
| filter filter.Filter |
| compression opt.Compression |
| blockSize int |
| |
| dataBlock blockWriter |
| indexBlock blockWriter |
| filterBlock filterWriter |
| pendingBH blockHandle |
| offset uint64 |
| nEntries int |
| // Scratch allocated enough for 5 uvarint. Block writer should not use |
| // first 20-bytes since it will be used to encode block handle, which |
| // then passed to the block writer itself. |
| scratch [50]byte |
| comparerScratch []byte |
| compressionScratch []byte |
| } |
| |
| func (w *Writer) writeBlock(buf *util.Buffer, compression opt.Compression) (bh blockHandle, err error) { |
| // Compress the buffer if necessary. |
| var b []byte |
| if compression == opt.SnappyCompression { |
| // Allocate scratch enough for compression and block trailer. |
| if n := snappy.MaxEncodedLen(buf.Len()) + blockTrailerLen; len(w.compressionScratch) < n { |
| w.compressionScratch = make([]byte, n) |
| } |
| compressed := snappy.Encode(w.compressionScratch, buf.Bytes()) |
| n := len(compressed) |
| b = compressed[:n+blockTrailerLen] |
| b[n] = blockTypeSnappyCompression |
| } else { |
| tmp := buf.Alloc(blockTrailerLen) |
| tmp[0] = blockTypeNoCompression |
| b = buf.Bytes() |
| } |
| |
| // Calculate the checksum. |
| n := len(b) - 4 |
| checksum := util.NewCRC(b[:n]).Value() |
| binary.LittleEndian.PutUint32(b[n:], checksum) |
| |
| // Write the buffer to the file. |
| _, err = w.writer.Write(b) |
| if err != nil { |
| return |
| } |
| bh = blockHandle{w.offset, uint64(len(b) - blockTrailerLen)} |
| w.offset += uint64(len(b)) |
| return |
| } |
| |
| func (w *Writer) flushPendingBH(key []byte) { |
| if w.pendingBH.length == 0 { |
| return |
| } |
| var separator []byte |
| if len(key) == 0 { |
| separator = w.cmp.Successor(w.comparerScratch[:0], w.dataBlock.prevKey) |
| } else { |
| separator = w.cmp.Separator(w.comparerScratch[:0], w.dataBlock.prevKey, key) |
| } |
| if separator == nil { |
| separator = w.dataBlock.prevKey |
| } else { |
| w.comparerScratch = separator |
| } |
| n := encodeBlockHandle(w.scratch[:20], w.pendingBH) |
| // Append the block handle to the index block. |
| w.indexBlock.append(separator, w.scratch[:n]) |
| // Reset prev key of the data block. |
| w.dataBlock.prevKey = w.dataBlock.prevKey[:0] |
| // Clear pending block handle. |
| w.pendingBH = blockHandle{} |
| } |
| |
| func (w *Writer) finishBlock() error { |
| w.dataBlock.finish() |
| bh, err := w.writeBlock(&w.dataBlock.buf, w.compression) |
| if err != nil { |
| return err |
| } |
| w.pendingBH = bh |
| // Reset the data block. |
| w.dataBlock.reset() |
| // Flush the filter block. |
| w.filterBlock.flush(w.offset) |
| return nil |
| } |
| |
| // Append appends key/value pair to the table. The keys passed must |
| // be in increasing order. |
| // |
| // It is safe to modify the contents of the arguments after Append returns. |
| func (w *Writer) Append(key, value []byte) error { |
| if w.err != nil { |
| return w.err |
| } |
| if w.nEntries > 0 && w.cmp.Compare(w.dataBlock.prevKey, key) >= 0 { |
| w.err = fmt.Errorf("leveldb/table: Writer: keys are not in increasing order: %q, %q", w.dataBlock.prevKey, key) |
| return w.err |
| } |
| |
| w.flushPendingBH(key) |
| // Append key/value pair to the data block. |
| w.dataBlock.append(key, value) |
| // Add key to the filter block. |
| w.filterBlock.add(key) |
| |
| // Finish the data block if block size target reached. |
| if w.dataBlock.bytesLen() >= w.blockSize { |
| if err := w.finishBlock(); err != nil { |
| w.err = err |
| return w.err |
| } |
| } |
| w.nEntries++ |
| return nil |
| } |
| |
| // BlocksLen returns number of blocks written so far. |
| func (w *Writer) BlocksLen() int { |
| n := w.indexBlock.nEntries |
| if w.pendingBH.length > 0 { |
| // Includes the pending block. |
| n++ |
| } |
| return n |
| } |
| |
| // EntriesLen returns number of entries added so far. |
| func (w *Writer) EntriesLen() int { |
| return w.nEntries |
| } |
| |
| // BytesLen returns number of bytes written so far. |
| func (w *Writer) BytesLen() int { |
| return int(w.offset) |
| } |
| |
| // Close will finalize the table. Calling Append is not possible |
| // after Close, but calling BlocksLen, EntriesLen and BytesLen |
| // is still possible. |
| func (w *Writer) Close() error { |
| if w.err != nil { |
| return w.err |
| } |
| |
| // Write the last data block. Or empty data block if there |
| // aren't any data blocks at all. |
| if w.dataBlock.nEntries > 0 || w.nEntries == 0 { |
| if err := w.finishBlock(); err != nil { |
| w.err = err |
| return w.err |
| } |
| } |
| w.flushPendingBH(nil) |
| |
| // Write the filter block. |
| var filterBH blockHandle |
| w.filterBlock.finish() |
| if buf := &w.filterBlock.buf; buf.Len() > 0 { |
| filterBH, w.err = w.writeBlock(buf, opt.NoCompression) |
| if w.err != nil { |
| return w.err |
| } |
| } |
| |
| // Write the metaindex block. |
| if filterBH.length > 0 { |
| key := []byte("filter." + w.filter.Name()) |
| n := encodeBlockHandle(w.scratch[:20], filterBH) |
| w.dataBlock.append(key, w.scratch[:n]) |
| } |
| w.dataBlock.finish() |
| metaindexBH, err := w.writeBlock(&w.dataBlock.buf, w.compression) |
| if err != nil { |
| w.err = err |
| return w.err |
| } |
| |
| // Write the index block. |
| w.indexBlock.finish() |
| indexBH, err := w.writeBlock(&w.indexBlock.buf, w.compression) |
| if err != nil { |
| w.err = err |
| return w.err |
| } |
| |
| // Write the table footer. |
| footer := w.scratch[:footerLen] |
| for i := range footer { |
| footer[i] = 0 |
| } |
| n := encodeBlockHandle(footer, metaindexBH) |
| encodeBlockHandle(footer[n:], indexBH) |
| copy(footer[footerLen-len(magic):], magic) |
| if _, err := w.writer.Write(footer); err != nil { |
| w.err = err |
| return w.err |
| } |
| w.offset += footerLen |
| |
| w.err = errors.New("leveldb/table: writer is closed") |
| return nil |
| } |
| |
| // NewWriter creates a new initialized table writer for the file. |
| // |
| // Table writer is not goroutine-safe. |
| func NewWriter(f io.Writer, o *opt.Options) *Writer { |
| w := &Writer{ |
| writer: f, |
| cmp: o.GetComparer(), |
| filter: o.GetFilter(), |
| compression: o.GetCompression(), |
| blockSize: o.GetBlockSize(), |
| comparerScratch: make([]byte, 0), |
| } |
| // data block |
| w.dataBlock.restartInterval = o.GetBlockRestartInterval() |
| // The first 20-bytes are used for encoding block handle. |
| w.dataBlock.scratch = w.scratch[20:] |
| // index block |
| w.indexBlock.restartInterval = 1 |
| w.indexBlock.scratch = w.scratch[20:] |
| // filter block |
| if w.filter != nil { |
| w.filterBlock.generator = w.filter.NewGenerator() |
| w.filterBlock.flush(0) |
| } |
| return w |
| } |