| // Copyright 2011 The LevelDB-Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // Taken from: https://code.google.com/p/leveldb-go/source/browse/leveldb/record/record.go?r=1d5ccbe03246da926391ee12d1c6caae054ff4b0 |
| // License, authors and contributors informations can be found at bellow URLs respectively: |
| // https://code.google.com/p/leveldb-go/source/browse/LICENSE |
| // https://code.google.com/p/leveldb-go/source/browse/AUTHORS |
| // https://code.google.com/p/leveldb-go/source/browse/CONTRIBUTORS |
| |
| // Package journal reads and writes sequences of journals. Each journal is a stream |
| // of bytes that completes before the next journal starts. |
| // |
| // When reading, call Next to obtain an io.Reader for the next journal. Next will |
| // return io.EOF when there are no more journals. It is valid to call Next |
| // without reading the current journal to exhaustion. |
| // |
| // When writing, call Next to obtain an io.Writer for the next journal. Calling |
| // Next finishes the current journal. Call Close to finish the final journal. |
| // |
| // Optionally, call Flush to finish the current journal and flush the underlying |
| // writer without starting a new journal. To start a new journal after flushing, |
| // call Next. |
| // |
| // Neither Readers or Writers are safe to use concurrently. |
| // |
| // Example code: |
| // func read(r io.Reader) ([]string, error) { |
| // var ss []string |
| // journals := journal.NewReader(r, nil, true, true) |
| // for { |
| // j, err := journals.Next() |
| // if err == io.EOF { |
| // break |
| // } |
| // if err != nil { |
| // return nil, err |
| // } |
| // s, err := ioutil.ReadAll(j) |
| // if err != nil { |
| // return nil, err |
| // } |
| // ss = append(ss, string(s)) |
| // } |
| // return ss, nil |
| // } |
| // |
| // func write(w io.Writer, ss []string) error { |
| // journals := journal.NewWriter(w) |
| // for _, s := range ss { |
| // j, err := journals.Next() |
| // if err != nil { |
| // return err |
| // } |
| // if _, err := j.Write([]byte(s)), err != nil { |
| // return err |
| // } |
| // } |
| // return journals.Close() |
| // } |
| // |
| // The wire format is that the stream is divided into 32KiB blocks, and each |
| // block contains a number of tightly packed chunks. Chunks cannot cross block |
| // boundaries. The last block may be shorter than 32 KiB. Any unused bytes in a |
| // block must be zero. |
| // |
| // A journal maps to one or more chunks. Each chunk has a 7 byte header (a 4 |
| // byte checksum, a 2 byte little-endian uint16 length, and a 1 byte chunk type) |
| // followed by a payload. The checksum is over the chunk type and the payload. |
| // |
| // There are four chunk types: whether the chunk is the full journal, or the |
| // first, middle or last chunk of a multi-chunk journal. A multi-chunk journal |
| // has one first chunk, zero or more middle chunks, and one last chunk. |
| // |
| // The wire format allows for limited recovery in the face of data corruption: |
| // on a format error (such as a checksum mismatch), the reader moves to the |
| // next block and looks for the next full or first chunk. |
| package journal |
| |
| import ( |
| "encoding/binary" |
| "fmt" |
| "io" |
| |
| "github.com/syndtr/goleveldb/leveldb/errors" |
| "github.com/syndtr/goleveldb/leveldb/util" |
| ) |
| |
| // These constants are part of the wire format and should not be changed. |
| const ( |
| fullChunkType = 1 |
| firstChunkType = 2 |
| middleChunkType = 3 |
| lastChunkType = 4 |
| ) |
| |
| const ( |
| blockSize = 32 * 1024 |
| headerSize = 7 |
| ) |
| |
| type flusher interface { |
| Flush() error |
| } |
| |
| // ErrCorrupted is the error type that generated by corrupted block or chunk. |
| type ErrCorrupted struct { |
| Size int |
| Reason string |
| } |
| |
| func (e *ErrCorrupted) Error() string { |
| return fmt.Sprintf("leveldb/journal: block/chunk corrupted: %s (%d bytes)", e.Reason, e.Size) |
| } |
| |
| // Dropper is the interface that wrap simple Drop method. The Drop |
| // method will be called when the journal reader dropping a block or chunk. |
| type Dropper interface { |
| Drop(err error) |
| } |
| |
| // Reader reads journals from an underlying io.Reader. |
| type Reader struct { |
| // r is the underlying reader. |
| r io.Reader |
| // the dropper. |
| dropper Dropper |
| // strict flag. |
| strict bool |
| // checksum flag. |
| checksum bool |
| // seq is the sequence number of the current journal. |
| seq int |
| // buf[i:j] is the unread portion of the current chunk's payload. |
| // The low bound, i, excludes the chunk header. |
| i, j int |
| // n is the number of bytes of buf that are valid. Once reading has started, |
| // only the final block can have n < blockSize. |
| n int |
| // last is whether the current chunk is the last chunk of the journal. |
| last bool |
| // err is any accumulated error. |
| err error |
| // buf is the buffer. |
| buf [blockSize]byte |
| } |
| |
| // NewReader returns a new reader. The dropper may be nil, and if |
| // strict is true then corrupted or invalid chunk will halt the journal |
| // reader entirely. |
| func NewReader(r io.Reader, dropper Dropper, strict, checksum bool) *Reader { |
| return &Reader{ |
| r: r, |
| dropper: dropper, |
| strict: strict, |
| checksum: checksum, |
| last: true, |
| } |
| } |
| |
| var errSkip = errors.New("leveldb/journal: skipped") |
| |
| func (r *Reader) corrupt(n int, reason string, skip bool) error { |
| if r.dropper != nil { |
| r.dropper.Drop(&ErrCorrupted{n, reason}) |
| } |
| if r.strict && !skip { |
| r.err = errors.NewErrCorrupted(nil, &ErrCorrupted{n, reason}) |
| return r.err |
| } |
| return errSkip |
| } |
| |
| // nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the |
| // next block into the buffer if necessary. |
| func (r *Reader) nextChunk(first bool) error { |
| for { |
| if r.j+headerSize <= r.n { |
| checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4]) |
| length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6]) |
| chunkType := r.buf[r.j+6] |
| |
| if checksum == 0 && length == 0 && chunkType == 0 { |
| // Drop entire block. |
| m := r.n - r.j |
| r.i = r.n |
| r.j = r.n |
| return r.corrupt(m, "zero header", false) |
| } else { |
| m := r.n - r.j |
| r.i = r.j + headerSize |
| r.j = r.j + headerSize + int(length) |
| if r.j > r.n { |
| // Drop entire block. |
| r.i = r.n |
| r.j = r.n |
| return r.corrupt(m, "chunk length overflows block", false) |
| } else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() { |
| // Drop entire block. |
| r.i = r.n |
| r.j = r.n |
| return r.corrupt(m, "checksum mismatch", false) |
| } |
| } |
| if first && chunkType != fullChunkType && chunkType != firstChunkType { |
| m := r.j - r.i |
| r.i = r.j |
| // Report the error, but skip it. |
| return r.corrupt(m+headerSize, "orphan chunk", true) |
| } |
| r.last = chunkType == fullChunkType || chunkType == lastChunkType |
| return nil |
| } |
| |
| // The last block. |
| if r.n < blockSize && r.n > 0 { |
| if !first { |
| return r.corrupt(0, "missing chunk part", false) |
| } |
| r.err = io.EOF |
| return r.err |
| } |
| |
| // Read block. |
| n, err := io.ReadFull(r.r, r.buf[:]) |
| if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { |
| return err |
| } |
| if n == 0 { |
| if !first { |
| return r.corrupt(0, "missing chunk part", false) |
| } |
| r.err = io.EOF |
| return r.err |
| } |
| r.i, r.j, r.n = 0, 0, n |
| } |
| } |
| |
| // Next returns a reader for the next journal. It returns io.EOF if there are no |
| // more journals. The reader returned becomes stale after the next Next call, |
| // and should no longer be used. If strict is false, the reader will returns |
| // io.ErrUnexpectedEOF error when found corrupted journal. |
| func (r *Reader) Next() (io.Reader, error) { |
| r.seq++ |
| if r.err != nil { |
| return nil, r.err |
| } |
| r.i = r.j |
| for { |
| if err := r.nextChunk(true); err == nil { |
| break |
| } else if err != errSkip { |
| return nil, err |
| } |
| } |
| return &singleReader{r, r.seq, nil}, nil |
| } |
| |
| // Reset resets the journal reader, allows reuse of the journal reader. Reset returns |
| // last accumulated error. |
| func (r *Reader) Reset(reader io.Reader, dropper Dropper, strict, checksum bool) error { |
| r.seq++ |
| err := r.err |
| r.r = reader |
| r.dropper = dropper |
| r.strict = strict |
| r.checksum = checksum |
| r.i = 0 |
| r.j = 0 |
| r.n = 0 |
| r.last = true |
| r.err = nil |
| return err |
| } |
| |
| type singleReader struct { |
| r *Reader |
| seq int |
| err error |
| } |
| |
| func (x *singleReader) Read(p []byte) (int, error) { |
| r := x.r |
| if r.seq != x.seq { |
| return 0, errors.New("leveldb/journal: stale reader") |
| } |
| if x.err != nil { |
| return 0, x.err |
| } |
| if r.err != nil { |
| return 0, r.err |
| } |
| for r.i == r.j { |
| if r.last { |
| return 0, io.EOF |
| } |
| x.err = r.nextChunk(false) |
| if x.err != nil { |
| if x.err == errSkip { |
| x.err = io.ErrUnexpectedEOF |
| } |
| return 0, x.err |
| } |
| } |
| n := copy(p, r.buf[r.i:r.j]) |
| r.i += n |
| return n, nil |
| } |
| |
| func (x *singleReader) ReadByte() (byte, error) { |
| r := x.r |
| if r.seq != x.seq { |
| return 0, errors.New("leveldb/journal: stale reader") |
| } |
| if x.err != nil { |
| return 0, x.err |
| } |
| if r.err != nil { |
| return 0, r.err |
| } |
| for r.i == r.j { |
| if r.last { |
| return 0, io.EOF |
| } |
| x.err = r.nextChunk(false) |
| if x.err != nil { |
| if x.err == errSkip { |
| x.err = io.ErrUnexpectedEOF |
| } |
| return 0, x.err |
| } |
| } |
| c := r.buf[r.i] |
| r.i++ |
| return c, nil |
| } |
| |
| // Writer writes journals to an underlying io.Writer. |
| type Writer struct { |
| // w is the underlying writer. |
| w io.Writer |
| // seq is the sequence number of the current journal. |
| seq int |
| // f is w as a flusher. |
| f flusher |
| // buf[i:j] is the bytes that will become the current chunk. |
| // The low bound, i, includes the chunk header. |
| i, j int |
| // buf[:written] has already been written to w. |
| // written is zero unless Flush has been called. |
| written int |
| // first is whether the current chunk is the first chunk of the journal. |
| first bool |
| // pending is whether a chunk is buffered but not yet written. |
| pending bool |
| // err is any accumulated error. |
| err error |
| // buf is the buffer. |
| buf [blockSize]byte |
| } |
| |
| // NewWriter returns a new Writer. |
| func NewWriter(w io.Writer) *Writer { |
| f, _ := w.(flusher) |
| return &Writer{ |
| w: w, |
| f: f, |
| } |
| } |
| |
| // fillHeader fills in the header for the pending chunk. |
| func (w *Writer) fillHeader(last bool) { |
| if w.i+headerSize > w.j || w.j > blockSize { |
| panic("leveldb/journal: bad writer state") |
| } |
| if last { |
| if w.first { |
| w.buf[w.i+6] = fullChunkType |
| } else { |
| w.buf[w.i+6] = lastChunkType |
| } |
| } else { |
| if w.first { |
| w.buf[w.i+6] = firstChunkType |
| } else { |
| w.buf[w.i+6] = middleChunkType |
| } |
| } |
| binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], util.NewCRC(w.buf[w.i+6:w.j]).Value()) |
| binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-headerSize)) |
| } |
| |
| // writeBlock writes the buffered block to the underlying writer, and reserves |
| // space for the next chunk's header. |
| func (w *Writer) writeBlock() { |
| _, w.err = w.w.Write(w.buf[w.written:]) |
| w.i = 0 |
| w.j = headerSize |
| w.written = 0 |
| } |
| |
| // writePending finishes the current journal and writes the buffer to the |
| // underlying writer. |
| func (w *Writer) writePending() { |
| if w.err != nil { |
| return |
| } |
| if w.pending { |
| w.fillHeader(true) |
| w.pending = false |
| } |
| _, w.err = w.w.Write(w.buf[w.written:w.j]) |
| w.written = w.j |
| } |
| |
| // Close finishes the current journal and closes the writer. |
| func (w *Writer) Close() error { |
| w.seq++ |
| w.writePending() |
| if w.err != nil { |
| return w.err |
| } |
| w.err = errors.New("leveldb/journal: closed Writer") |
| return nil |
| } |
| |
| // Flush finishes the current journal, writes to the underlying writer, and |
| // flushes it if that writer implements interface{ Flush() error }. |
| func (w *Writer) Flush() error { |
| w.seq++ |
| w.writePending() |
| if w.err != nil { |
| return w.err |
| } |
| if w.f != nil { |
| w.err = w.f.Flush() |
| return w.err |
| } |
| return nil |
| } |
| |
| // Reset resets the journal writer, allows reuse of the journal writer. Reset |
| // will also closes the journal writer if not already. |
| func (w *Writer) Reset(writer io.Writer) (err error) { |
| w.seq++ |
| if w.err == nil { |
| w.writePending() |
| err = w.err |
| } |
| w.w = writer |
| w.f, _ = writer.(flusher) |
| w.i = 0 |
| w.j = 0 |
| w.written = 0 |
| w.first = false |
| w.pending = false |
| w.err = nil |
| return |
| } |
| |
| // Next returns a writer for the next journal. The writer returned becomes stale |
| // after the next Close, Flush or Next call, and should no longer be used. |
| func (w *Writer) Next() (io.Writer, error) { |
| w.seq++ |
| if w.err != nil { |
| return nil, w.err |
| } |
| if w.pending { |
| w.fillHeader(true) |
| } |
| w.i = w.j |
| w.j = w.j + headerSize |
| // Check if there is room in the block for the header. |
| if w.j > blockSize { |
| // Fill in the rest of the block with zeroes. |
| for k := w.i; k < blockSize; k++ { |
| w.buf[k] = 0 |
| } |
| w.writeBlock() |
| if w.err != nil { |
| return nil, w.err |
| } |
| } |
| w.first = true |
| w.pending = true |
| return singleWriter{w, w.seq}, nil |
| } |
| |
| type singleWriter struct { |
| w *Writer |
| seq int |
| } |
| |
| func (x singleWriter) Write(p []byte) (int, error) { |
| w := x.w |
| if w.seq != x.seq { |
| return 0, errors.New("leveldb/journal: stale writer") |
| } |
| if w.err != nil { |
| return 0, w.err |
| } |
| n0 := len(p) |
| for len(p) > 0 { |
| // Write a block, if it is full. |
| if w.j == blockSize { |
| w.fillHeader(false) |
| w.writeBlock() |
| if w.err != nil { |
| return 0, w.err |
| } |
| w.first = false |
| } |
| // Copy bytes into the buffer. |
| n := copy(w.buf[w.j:], p) |
| w.j += n |
| p = p[n:] |
| } |
| return n0, nil |
| } |