blob: d094c3d0f8a5dc0d4845173ef027dd618348dd5f [file] [log] [blame]
// Copyright 2011 The LevelDB-Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Taken from: https://code.google.com/p/leveldb-go/source/browse/leveldb/record/record.go?r=1d5ccbe03246da926391ee12d1c6caae054ff4b0
// License, authors and contributors informations can be found at bellow URLs respectively:
// https://code.google.com/p/leveldb-go/source/browse/LICENSE
// https://code.google.com/p/leveldb-go/source/browse/AUTHORS
// https://code.google.com/p/leveldb-go/source/browse/CONTRIBUTORS
// Package journal reads and writes sequences of journals. Each journal is a stream
// of bytes that completes before the next journal starts.
//
// When reading, call Next to obtain an io.Reader for the next journal. Next will
// return io.EOF when there are no more journals. It is valid to call Next
// without reading the current journal to exhaustion.
//
// When writing, call Next to obtain an io.Writer for the next journal. Calling
// Next finishes the current journal. Call Close to finish the final journal.
//
// Optionally, call Flush to finish the current journal and flush the underlying
// writer without starting a new journal. To start a new journal after flushing,
// call Next.
//
// Neither Readers or Writers are safe to use concurrently.
//
// Example code:
// func read(r io.Reader) ([]string, error) {
// var ss []string
// journals := journal.NewReader(r, nil, true, true)
// for {
// j, err := journals.Next()
// if err == io.EOF {
// break
// }
// if err != nil {
// return nil, err
// }
// s, err := ioutil.ReadAll(j)
// if err != nil {
// return nil, err
// }
// ss = append(ss, string(s))
// }
// return ss, nil
// }
//
// func write(w io.Writer, ss []string) error {
// journals := journal.NewWriter(w)
// for _, s := range ss {
// j, err := journals.Next()
// if err != nil {
// return err
// }
// if _, err := j.Write([]byte(s)), err != nil {
// return err
// }
// }
// return journals.Close()
// }
//
// The wire format is that the stream is divided into 32KiB blocks, and each
// block contains a number of tightly packed chunks. Chunks cannot cross block
// boundaries. The last block may be shorter than 32 KiB. Any unused bytes in a
// block must be zero.
//
// A journal maps to one or more chunks. Each chunk has a 7 byte header (a 4
// byte checksum, a 2 byte little-endian uint16 length, and a 1 byte chunk type)
// followed by a payload. The checksum is over the chunk type and the payload.
//
// There are four chunk types: whether the chunk is the full journal, or the
// first, middle or last chunk of a multi-chunk journal. A multi-chunk journal
// has one first chunk, zero or more middle chunks, and one last chunk.
//
// The wire format allows for limited recovery in the face of data corruption:
// on a format error (such as a checksum mismatch), the reader moves to the
// next block and looks for the next full or first chunk.
package journal
import (
"encoding/binary"
"fmt"
"io"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/util"
)
// These constants are part of the wire format and should not be changed.
const (
fullChunkType = 1
firstChunkType = 2
middleChunkType = 3
lastChunkType = 4
)
const (
blockSize = 32 * 1024
headerSize = 7
)
type flusher interface {
Flush() error
}
// ErrCorrupted is the error type that generated by corrupted block or chunk.
type ErrCorrupted struct {
Size int
Reason string
}
func (e *ErrCorrupted) Error() string {
return fmt.Sprintf("leveldb/journal: block/chunk corrupted: %s (%d bytes)", e.Reason, e.Size)
}
// Dropper is the interface that wrap simple Drop method. The Drop
// method will be called when the journal reader dropping a block or chunk.
type Dropper interface {
Drop(err error)
}
// Reader reads journals from an underlying io.Reader.
type Reader struct {
// r is the underlying reader.
r io.Reader
// the dropper.
dropper Dropper
// strict flag.
strict bool
// checksum flag.
checksum bool
// seq is the sequence number of the current journal.
seq int
// buf[i:j] is the unread portion of the current chunk's payload.
// The low bound, i, excludes the chunk header.
i, j int
// n is the number of bytes of buf that are valid. Once reading has started,
// only the final block can have n < blockSize.
n int
// last is whether the current chunk is the last chunk of the journal.
last bool
// err is any accumulated error.
err error
// buf is the buffer.
buf [blockSize]byte
}
// NewReader returns a new reader. The dropper may be nil, and if
// strict is true then corrupted or invalid chunk will halt the journal
// reader entirely.
func NewReader(r io.Reader, dropper Dropper, strict, checksum bool) *Reader {
return &Reader{
r: r,
dropper: dropper,
strict: strict,
checksum: checksum,
last: true,
}
}
var errSkip = errors.New("leveldb/journal: skipped")
func (r *Reader) corrupt(n int, reason string, skip bool) error {
if r.dropper != nil {
r.dropper.Drop(&ErrCorrupted{n, reason})
}
if r.strict && !skip {
r.err = errors.NewErrCorrupted(storage.FileDesc{}, &ErrCorrupted{n, reason})
return r.err
}
return errSkip
}
// nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the
// next block into the buffer if necessary.
func (r *Reader) nextChunk(first bool) error {
for {
if r.j+headerSize <= r.n {
checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4])
length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6])
chunkType := r.buf[r.j+6]
unprocBlock := r.n - r.j
if checksum == 0 && length == 0 && chunkType == 0 {
// Drop entire block.
r.i = r.n
r.j = r.n
return r.corrupt(unprocBlock, "zero header", false)
}
if chunkType < fullChunkType || chunkType > lastChunkType {
// Drop entire block.
r.i = r.n
r.j = r.n
return r.corrupt(unprocBlock, fmt.Sprintf("invalid chunk type %#x", chunkType), false)
}
r.i = r.j + headerSize
r.j = r.j + headerSize + int(length)
if r.j > r.n {
// Drop entire block.
r.i = r.n
r.j = r.n
return r.corrupt(unprocBlock, "chunk length overflows block", false)
} else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() {
// Drop entire block.
r.i = r.n
r.j = r.n
return r.corrupt(unprocBlock, "checksum mismatch", false)
}
if first && chunkType != fullChunkType && chunkType != firstChunkType {
chunkLength := (r.j - r.i) + headerSize
r.i = r.j
// Report the error, but skip it.
return r.corrupt(chunkLength, "orphan chunk", true)
}
r.last = chunkType == fullChunkType || chunkType == lastChunkType
return nil
}
// The last block.
if r.n < blockSize && r.n > 0 {
if !first {
return r.corrupt(0, "missing chunk part", false)
}
r.err = io.EOF
return r.err
}
// Read block.
n, err := io.ReadFull(r.r, r.buf[:])
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
return err
}
if n == 0 {
if !first {
return r.corrupt(0, "missing chunk part", false)
}
r.err = io.EOF
return r.err
}
r.i, r.j, r.n = 0, 0, n
}
}
// Next returns a reader for the next journal. It returns io.EOF if there are no
// more journals. The reader returned becomes stale after the next Next call,
// and should no longer be used. If strict is false, the reader will returns
// io.ErrUnexpectedEOF error when found corrupted journal.
func (r *Reader) Next() (io.Reader, error) {
r.seq++
if r.err != nil {
return nil, r.err
}
r.i = r.j
for {
if err := r.nextChunk(true); err == nil {
break
} else if err != errSkip {
return nil, err
}
}
return &singleReader{r, r.seq, nil}, nil
}
// Reset resets the journal reader, allows reuse of the journal reader. Reset returns
// last accumulated error.
func (r *Reader) Reset(reader io.Reader, dropper Dropper, strict, checksum bool) error {
r.seq++
err := r.err
r.r = reader
r.dropper = dropper
r.strict = strict
r.checksum = checksum
r.i = 0
r.j = 0
r.n = 0
r.last = true
r.err = nil
return err
}
type singleReader struct {
r *Reader
seq int
err error
}
func (x *singleReader) Read(p []byte) (int, error) {
r := x.r
if r.seq != x.seq {
return 0, errors.New("leveldb/journal: stale reader")
}
if x.err != nil {
return 0, x.err
}
if r.err != nil {
return 0, r.err
}
for r.i == r.j {
if r.last {
return 0, io.EOF
}
x.err = r.nextChunk(false)
if x.err != nil {
if x.err == errSkip {
x.err = io.ErrUnexpectedEOF
}
return 0, x.err
}
}
n := copy(p, r.buf[r.i:r.j])
r.i += n
return n, nil
}
func (x *singleReader) ReadByte() (byte, error) {
r := x.r
if r.seq != x.seq {
return 0, errors.New("leveldb/journal: stale reader")
}
if x.err != nil {
return 0, x.err
}
if r.err != nil {
return 0, r.err
}
for r.i == r.j {
if r.last {
return 0, io.EOF
}
x.err = r.nextChunk(false)
if x.err != nil {
if x.err == errSkip {
x.err = io.ErrUnexpectedEOF
}
return 0, x.err
}
}
c := r.buf[r.i]
r.i++
return c, nil
}
// Writer writes journals to an underlying io.Writer.
type Writer struct {
// w is the underlying writer.
w io.Writer
// seq is the sequence number of the current journal.
seq int
// f is w as a flusher.
f flusher
// buf[i:j] is the bytes that will become the current chunk.
// The low bound, i, includes the chunk header.
i, j int
// buf[:written] has already been written to w.
// written is zero unless Flush has been called.
written int
// first is whether the current chunk is the first chunk of the journal.
first bool
// pending is whether a chunk is buffered but not yet written.
pending bool
// err is any accumulated error.
err error
// buf is the buffer.
buf [blockSize]byte
}
// NewWriter returns a new Writer.
func NewWriter(w io.Writer) *Writer {
f, _ := w.(flusher)
return &Writer{
w: w,
f: f,
}
}
// fillHeader fills in the header for the pending chunk.
func (w *Writer) fillHeader(last bool) {
if w.i+headerSize > w.j || w.j > blockSize {
panic("leveldb/journal: bad writer state")
}
if last {
if w.first {
w.buf[w.i+6] = fullChunkType
} else {
w.buf[w.i+6] = lastChunkType
}
} else {
if w.first {
w.buf[w.i+6] = firstChunkType
} else {
w.buf[w.i+6] = middleChunkType
}
}
binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], util.NewCRC(w.buf[w.i+6:w.j]).Value())
binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-headerSize))
}
// writeBlock writes the buffered block to the underlying writer, and reserves
// space for the next chunk's header.
func (w *Writer) writeBlock() {
_, w.err = w.w.Write(w.buf[w.written:])
w.i = 0
w.j = headerSize
w.written = 0
}
// writePending finishes the current journal and writes the buffer to the
// underlying writer.
func (w *Writer) writePending() {
if w.err != nil {
return
}
if w.pending {
w.fillHeader(true)
w.pending = false
}
_, w.err = w.w.Write(w.buf[w.written:w.j])
w.written = w.j
}
// Close finishes the current journal and closes the writer.
func (w *Writer) Close() error {
w.seq++
w.writePending()
if w.err != nil {
return w.err
}
w.err = errors.New("leveldb/journal: closed Writer")
return nil
}
// Flush finishes the current journal, writes to the underlying writer, and
// flushes it if that writer implements interface{ Flush() error }.
func (w *Writer) Flush() error {
w.seq++
w.writePending()
if w.err != nil {
return w.err
}
if w.f != nil {
w.err = w.f.Flush()
return w.err
}
return nil
}
// Reset resets the journal writer, allows reuse of the journal writer. Reset
// will also closes the journal writer if not already.
func (w *Writer) Reset(writer io.Writer) (err error) {
w.seq++
if w.err == nil {
w.writePending()
err = w.err
}
w.w = writer
w.f, _ = writer.(flusher)
w.i = 0
w.j = 0
w.written = 0
w.first = false
w.pending = false
w.err = nil
return
}
// Next returns a writer for the next journal. The writer returned becomes stale
// after the next Close, Flush or Next call, and should no longer be used.
func (w *Writer) Next() (io.Writer, error) {
w.seq++
if w.err != nil {
return nil, w.err
}
if w.pending {
w.fillHeader(true)
}
w.i = w.j
w.j = w.j + headerSize
// Check if there is room in the block for the header.
if w.j > blockSize {
// Fill in the rest of the block with zeroes.
for k := w.i; k < blockSize; k++ {
w.buf[k] = 0
}
w.writeBlock()
if w.err != nil {
return nil, w.err
}
}
w.first = true
w.pending = true
return singleWriter{w, w.seq}, nil
}
type singleWriter struct {
w *Writer
seq int
}
func (x singleWriter) Write(p []byte) (int, error) {
w := x.w
if w.seq != x.seq {
return 0, errors.New("leveldb/journal: stale writer")
}
if w.err != nil {
return 0, w.err
}
n0 := len(p)
for len(p) > 0 {
// Write a block, if it is full.
if w.j == blockSize {
w.fillHeader(false)
w.writeBlock()
if w.err != nil {
return 0, w.err
}
w.first = false
}
// Copy bytes into the buffer.
n := copy(w.buf[w.j:], p)
w.j += n
p = p[n:]
}
return n0, nil
}