blob: 274dee6da855743c4c96df0f64737671a6dd900a [file] [log] [blame]
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package table
import (
"encoding/binary"
"errors"
"fmt"
"io"
"github.com/golang/snappy"
"github.com/syndtr/goleveldb/leveldb/comparer"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
func sharedPrefixLen(a, b []byte) int {
i, n := 0, len(a)
if n > len(b) {
n = len(b)
}
for i < n && a[i] == b[i] {
i++
}
return i
}
type blockWriter struct {
restartInterval int
buf util.Buffer
nEntries int
prevKey []byte
restarts []uint32
scratch []byte
}
func (w *blockWriter) append(key, value []byte) {
nShared := 0
if w.nEntries%w.restartInterval == 0 {
w.restarts = append(w.restarts, uint32(w.buf.Len()))
} else {
nShared = sharedPrefixLen(w.prevKey, key)
}
n := binary.PutUvarint(w.scratch[0:], uint64(nShared))
n += binary.PutUvarint(w.scratch[n:], uint64(len(key)-nShared))
n += binary.PutUvarint(w.scratch[n:], uint64(len(value)))
w.buf.Write(w.scratch[:n])
w.buf.Write(key[nShared:])
w.buf.Write(value)
w.prevKey = append(w.prevKey[:0], key...)
w.nEntries++
}
func (w *blockWriter) finish() {
// Write restarts entry.
if w.nEntries == 0 {
// Must have at least one restart entry.
w.restarts = append(w.restarts, 0)
}
w.restarts = append(w.restarts, uint32(len(w.restarts)))
for _, x := range w.restarts {
buf4 := w.buf.Alloc(4)
binary.LittleEndian.PutUint32(buf4, x)
}
}
func (w *blockWriter) reset() {
w.buf.Reset()
w.nEntries = 0
w.restarts = w.restarts[:0]
}
func (w *blockWriter) bytesLen() int {
restartsLen := len(w.restarts)
if restartsLen == 0 {
restartsLen = 1
}
return w.buf.Len() + 4*restartsLen + 4
}
type filterWriter struct {
generator filter.FilterGenerator
buf util.Buffer
nKeys int
offsets []uint32
}
func (w *filterWriter) add(key []byte) {
if w.generator == nil {
return
}
w.generator.Add(key)
w.nKeys++
}
func (w *filterWriter) flush(offset uint64) {
if w.generator == nil {
return
}
for x := int(offset / filterBase); x > len(w.offsets); {
w.generate()
}
}
func (w *filterWriter) finish() {
if w.generator == nil {
return
}
// Generate last keys.
if w.nKeys > 0 {
w.generate()
}
w.offsets = append(w.offsets, uint32(w.buf.Len()))
for _, x := range w.offsets {
buf4 := w.buf.Alloc(4)
binary.LittleEndian.PutUint32(buf4, x)
}
w.buf.WriteByte(filterBaseLg)
}
func (w *filterWriter) generate() {
// Record offset.
w.offsets = append(w.offsets, uint32(w.buf.Len()))
// Generate filters.
if w.nKeys > 0 {
w.generator.Generate(&w.buf)
w.nKeys = 0
}
}
// Writer is a table writer.
type Writer struct {
writer io.Writer
err error
// Options
cmp comparer.Comparer
filter filter.Filter
compression opt.Compression
blockSize int
dataBlock blockWriter
indexBlock blockWriter
filterBlock filterWriter
pendingBH blockHandle
offset uint64
nEntries int
// Scratch allocated enough for 5 uvarint. Block writer should not use
// first 20-bytes since it will be used to encode block handle, which
// then passed to the block writer itself.
scratch [50]byte
comparerScratch []byte
compressionScratch []byte
}
func (w *Writer) writeBlock(buf *util.Buffer, compression opt.Compression) (bh blockHandle, err error) {
// Compress the buffer if necessary.
var b []byte
if compression == opt.SnappyCompression {
// Allocate scratch enough for compression and block trailer.
if n := snappy.MaxEncodedLen(buf.Len()) + blockTrailerLen; len(w.compressionScratch) < n {
w.compressionScratch = make([]byte, n)
}
compressed := snappy.Encode(w.compressionScratch, buf.Bytes())
n := len(compressed)
b = compressed[:n+blockTrailerLen]
b[n] = blockTypeSnappyCompression
} else {
tmp := buf.Alloc(blockTrailerLen)
tmp[0] = blockTypeNoCompression
b = buf.Bytes()
}
// Calculate the checksum.
n := len(b) - 4
checksum := util.NewCRC(b[:n]).Value()
binary.LittleEndian.PutUint32(b[n:], checksum)
// Write the buffer to the file.
_, err = w.writer.Write(b)
if err != nil {
return
}
bh = blockHandle{w.offset, uint64(len(b) - blockTrailerLen)}
w.offset += uint64(len(b))
return
}
func (w *Writer) flushPendingBH(key []byte) {
if w.pendingBH.length == 0 {
return
}
var separator []byte
if len(key) == 0 {
separator = w.cmp.Successor(w.comparerScratch[:0], w.dataBlock.prevKey)
} else {
separator = w.cmp.Separator(w.comparerScratch[:0], w.dataBlock.prevKey, key)
}
if separator == nil {
separator = w.dataBlock.prevKey
} else {
w.comparerScratch = separator
}
n := encodeBlockHandle(w.scratch[:20], w.pendingBH)
// Append the block handle to the index block.
w.indexBlock.append(separator, w.scratch[:n])
// Reset prev key of the data block.
w.dataBlock.prevKey = w.dataBlock.prevKey[:0]
// Clear pending block handle.
w.pendingBH = blockHandle{}
}
func (w *Writer) finishBlock() error {
w.dataBlock.finish()
bh, err := w.writeBlock(&w.dataBlock.buf, w.compression)
if err != nil {
return err
}
w.pendingBH = bh
// Reset the data block.
w.dataBlock.reset()
// Flush the filter block.
w.filterBlock.flush(w.offset)
return nil
}
// Append appends key/value pair to the table. The keys passed must
// be in increasing order.
//
// It is safe to modify the contents of the arguments after Append returns.
func (w *Writer) Append(key, value []byte) error {
if w.err != nil {
return w.err
}
if w.nEntries > 0 && w.cmp.Compare(w.dataBlock.prevKey, key) >= 0 {
w.err = fmt.Errorf("leveldb/table: Writer: keys are not in increasing order: %q, %q", w.dataBlock.prevKey, key)
return w.err
}
w.flushPendingBH(key)
// Append key/value pair to the data block.
w.dataBlock.append(key, value)
// Add key to the filter block.
w.filterBlock.add(key)
// Finish the data block if block size target reached.
if w.dataBlock.bytesLen() >= w.blockSize {
if err := w.finishBlock(); err != nil {
w.err = err
return w.err
}
}
w.nEntries++
return nil
}
// BlocksLen returns number of blocks written so far.
func (w *Writer) BlocksLen() int {
n := w.indexBlock.nEntries
if w.pendingBH.length > 0 {
// Includes the pending block.
n++
}
return n
}
// EntriesLen returns number of entries added so far.
func (w *Writer) EntriesLen() int {
return w.nEntries
}
// BytesLen returns number of bytes written so far.
func (w *Writer) BytesLen() int {
return int(w.offset)
}
// Close will finalize the table. Calling Append is not possible
// after Close, but calling BlocksLen, EntriesLen and BytesLen
// is still possible.
func (w *Writer) Close() error {
if w.err != nil {
return w.err
}
// Write the last data block. Or empty data block if there
// aren't any data blocks at all.
if w.dataBlock.nEntries > 0 || w.nEntries == 0 {
if err := w.finishBlock(); err != nil {
w.err = err
return w.err
}
}
w.flushPendingBH(nil)
// Write the filter block.
var filterBH blockHandle
w.filterBlock.finish()
if buf := &w.filterBlock.buf; buf.Len() > 0 {
filterBH, w.err = w.writeBlock(buf, opt.NoCompression)
if w.err != nil {
return w.err
}
}
// Write the metaindex block.
if filterBH.length > 0 {
key := []byte("filter." + w.filter.Name())
n := encodeBlockHandle(w.scratch[:20], filterBH)
w.dataBlock.append(key, w.scratch[:n])
}
w.dataBlock.finish()
metaindexBH, err := w.writeBlock(&w.dataBlock.buf, w.compression)
if err != nil {
w.err = err
return w.err
}
// Write the index block.
w.indexBlock.finish()
indexBH, err := w.writeBlock(&w.indexBlock.buf, w.compression)
if err != nil {
w.err = err
return w.err
}
// Write the table footer.
footer := w.scratch[:footerLen]
for i := range footer {
footer[i] = 0
}
n := encodeBlockHandle(footer, metaindexBH)
encodeBlockHandle(footer[n:], indexBH)
copy(footer[footerLen-len(magic):], magic)
if _, err := w.writer.Write(footer); err != nil {
w.err = err
return w.err
}
w.offset += footerLen
w.err = errors.New("leveldb/table: writer is closed")
return nil
}
// NewWriter creates a new initialized table writer for the file.
//
// Table writer is not goroutine-safe.
func NewWriter(f io.Writer, o *opt.Options) *Writer {
w := &Writer{
writer: f,
cmp: o.GetComparer(),
filter: o.GetFilter(),
compression: o.GetCompression(),
blockSize: o.GetBlockSize(),
comparerScratch: make([]byte, 0),
}
// data block
w.dataBlock.restartInterval = o.GetBlockRestartInterval()
// The first 20-bytes are used for encoding block handle.
w.dataBlock.scratch = w.scratch[20:]
// index block
w.indexBlock.restartInterval = 1
w.indexBlock.scratch = w.scratch[20:]
// filter block
if w.filter != nil {
w.filterBlock.generator = w.filter.NewGenerator()
w.filterBlock.flush(0)
}
return w
}