| // The `fwd` package provides a buffered reader |
| // and writer. Each has methods that help improve |
| // the encoding/decoding performance of some binary |
| // protocols. |
| // |
| // The `fwd.Writer` and `fwd.Reader` type provide similar |
| // functionality to their counterparts in `bufio`, plus |
| // a few extra utility methods that simplify read-ahead |
| // and write-ahead. I wrote this package to improve serialization |
| // performance for http://github.com/tinylib/msgp, |
| // where it provided about a 2x speedup over `bufio` for certain |
| // workloads. However, care must be taken to understand the semantics of the |
| // extra methods provided by this package, as they allow |
| // the user to access and manipulate the buffer memory |
| // directly. |
| // |
| // The extra methods for `fwd.Reader` are `Peek`, `Skip` |
| // and `Next`. `(*fwd.Reader).Peek`, unlike `(*bufio.Reader).Peek`, |
| // will re-allocate the read buffer in order to accommodate arbitrarily |
| // large read-ahead. `(*fwd.Reader).Skip` skips the next `n` bytes |
| // in the stream, and uses the `io.Seeker` interface if the underlying |
| // stream implements it. `(*fwd.Reader).Next` returns a slice pointing |
| // to the next `n` bytes in the read buffer (like `Peek`), but also |
| // increments the read position. This allows users to process streams |
| // in arbitrary block sizes without having to manage appropriately-sized |
| // slices. Additionally, obviating the need to copy the data from the |
| // buffer to another location in memory can improve performance dramatically |
| // in CPU-bound applications. |
| // |
| // `fwd.Writer` only has one extra method, which is `(*fwd.Writer).Next`, which |
| // returns a slice pointing to the next `n` bytes of the writer, and increments |
| // the write position by the length of the returned slice. This allows users |
| // to write directly to the end of the buffer. |
| // |
| package fwd |
| |
| import "io" |
| |
| const ( |
| // DefaultReaderSize is the default size of the read buffer |
| DefaultReaderSize = 2048 |
| |
| // minimum read buffer; straight from bufio |
| minReaderSize = 16 |
| ) |
| |
| // NewReader returns a new *Reader that reads from 'r' |
| func NewReader(r io.Reader) *Reader { |
| return NewReaderSize(r, DefaultReaderSize) |
| } |
| |
| // NewReaderSize returns a new *Reader that |
| // reads from 'r' and has a buffer size 'n' |
| func NewReaderSize(r io.Reader, n int) *Reader { |
| rd := &Reader{ |
| r: r, |
| data: make([]byte, 0, max(minReaderSize, n)), |
| } |
| if s, ok := r.(io.Seeker); ok { |
| rd.rs = s |
| } |
| return rd |
| } |
| |
| // Reader is a buffered look-ahead reader |
| type Reader struct { |
| r io.Reader // underlying reader |
| |
| // data[n:len(data)] is buffered data; data[len(data):cap(data)] is free buffer space |
| data []byte // data |
| n int // read offset |
| state error // last read error |
| |
| // if the reader past to NewReader was |
| // also an io.Seeker, this is non-nil |
| rs io.Seeker |
| } |
| |
| // Reset resets the underlying reader |
| // and the read buffer. |
| func (r *Reader) Reset(rd io.Reader) { |
| r.r = rd |
| r.data = r.data[0:0] |
| r.n = 0 |
| r.state = nil |
| if s, ok := rd.(io.Seeker); ok { |
| r.rs = s |
| } else { |
| r.rs = nil |
| } |
| } |
| |
| // more() does one read on the underlying reader |
| func (r *Reader) more() { |
| // move data backwards so that |
| // the read offset is 0; this way |
| // we can supply the maximum number of |
| // bytes to the reader |
| if r.n != 0 { |
| if r.n < len(r.data) { |
| r.data = r.data[:copy(r.data[0:], r.data[r.n:])] |
| } else { |
| r.data = r.data[:0] |
| } |
| r.n = 0 |
| } |
| var a int |
| a, r.state = r.r.Read(r.data[len(r.data):cap(r.data)]) |
| if a == 0 && r.state == nil { |
| r.state = io.ErrNoProgress |
| return |
| } |
| r.data = r.data[:len(r.data)+a] |
| } |
| |
| // pop error |
| func (r *Reader) err() (e error) { |
| e, r.state = r.state, nil |
| return |
| } |
| |
| // pop error; EOF -> io.ErrUnexpectedEOF |
| func (r *Reader) noEOF() (e error) { |
| e, r.state = r.state, nil |
| if e == io.EOF { |
| e = io.ErrUnexpectedEOF |
| } |
| return |
| } |
| |
| // buffered bytes |
| func (r *Reader) buffered() int { return len(r.data) - r.n } |
| |
| // Buffered returns the number of bytes currently in the buffer |
| func (r *Reader) Buffered() int { return len(r.data) - r.n } |
| |
| // BufferSize returns the total size of the buffer |
| func (r *Reader) BufferSize() int { return cap(r.data) } |
| |
| // Peek returns the next 'n' buffered bytes, |
| // reading from the underlying reader if necessary. |
| // It will only return a slice shorter than 'n' bytes |
| // if it also returns an error. Peek does not advance |
| // the reader. EOF errors are *not* returned as |
| // io.ErrUnexpectedEOF. |
| func (r *Reader) Peek(n int) ([]byte, error) { |
| // in the degenerate case, |
| // we may need to realloc |
| // (the caller asked for more |
| // bytes than the size of the buffer) |
| if cap(r.data) < n { |
| old := r.data[r.n:] |
| r.data = make([]byte, n+r.buffered()) |
| r.data = r.data[:copy(r.data, old)] |
| r.n = 0 |
| } |
| |
| // keep filling until |
| // we hit an error or |
| // read enough bytes |
| for r.buffered() < n && r.state == nil { |
| r.more() |
| } |
| |
| // we must have hit an error |
| if r.buffered() < n { |
| return r.data[r.n:], r.err() |
| } |
| |
| return r.data[r.n : r.n+n], nil |
| } |
| |
| // Skip moves the reader forward 'n' bytes. |
| // Returns the number of bytes skipped and any |
| // errors encountered. It is analogous to Seek(n, 1). |
| // If the underlying reader implements io.Seeker, then |
| // that method will be used to skip forward. |
| // |
| // If the reader encounters |
| // an EOF before skipping 'n' bytes, it |
| // returns io.ErrUnexpectedEOF. If the |
| // underlying reader implements io.Seeker, then |
| // those rules apply instead. (Many implementations |
| // will not return `io.EOF` until the next call |
| // to Read.) |
| func (r *Reader) Skip(n int) (int, error) { |
| |
| // fast path |
| if r.buffered() >= n { |
| r.n += n |
| return n, nil |
| } |
| |
| // use seeker implementation |
| // if we can |
| if r.rs != nil { |
| return r.skipSeek(n) |
| } |
| |
| // loop on filling |
| // and then erasing |
| o := n |
| for r.buffered() < n && r.state == nil { |
| r.more() |
| // we can skip forward |
| // up to r.buffered() bytes |
| step := min(r.buffered(), n) |
| r.n += step |
| n -= step |
| } |
| // at this point, n should be |
| // 0 if everything went smoothly |
| return o - n, r.noEOF() |
| } |
| |
| // Next returns the next 'n' bytes in the stream. |
| // Unlike Peek, Next advances the reader position. |
| // The returned bytes point to the same |
| // data as the buffer, so the slice is |
| // only valid until the next reader method call. |
| // An EOF is considered an unexpected error. |
| // If an the returned slice is less than the |
| // length asked for, an error will be returned, |
| // and the reader position will not be incremented. |
| func (r *Reader) Next(n int) ([]byte, error) { |
| |
| // in case the buffer is too small |
| if cap(r.data) < n { |
| old := r.data[r.n:] |
| r.data = make([]byte, n+r.buffered()) |
| r.data = r.data[:copy(r.data, old)] |
| r.n = 0 |
| } |
| |
| // fill at least 'n' bytes |
| for r.buffered() < n && r.state == nil { |
| r.more() |
| } |
| |
| if r.buffered() < n { |
| return r.data[r.n:], r.noEOF() |
| } |
| out := r.data[r.n : r.n+n] |
| r.n += n |
| return out, nil |
| } |
| |
| // skipSeek uses the io.Seeker to seek forward. |
| // only call this function when n > r.buffered() |
| func (r *Reader) skipSeek(n int) (int, error) { |
| o := r.buffered() |
| // first, clear buffer |
| n -= o |
| r.n = 0 |
| r.data = r.data[:0] |
| |
| // then seek forward remaning bytes |
| i, err := r.rs.Seek(int64(n), 1) |
| return int(i) + o, err |
| } |
| |
| // Read implements `io.Reader` |
| func (r *Reader) Read(b []byte) (int, error) { |
| // if we have data in the buffer, just |
| // return that. |
| if r.buffered() != 0 { |
| x := copy(b, r.data[r.n:]) |
| r.n += x |
| return x, nil |
| } |
| var n int |
| // we have no buffered data; determine |
| // whether or not to buffer or call |
| // the underlying reader directly |
| if len(b) >= cap(r.data) { |
| n, r.state = r.r.Read(b) |
| } else { |
| r.more() |
| n = copy(b, r.data) |
| r.n = n |
| } |
| if n == 0 { |
| return 0, r.err() |
| } |
| return n, nil |
| } |
| |
| // ReadFull attempts to read len(b) bytes into |
| // 'b'. It returns the number of bytes read into |
| // 'b', and an error if it does not return len(b). |
| // EOF is considered an unexpected error. |
| func (r *Reader) ReadFull(b []byte) (int, error) { |
| var n int // read into b |
| var nn int // scratch |
| l := len(b) |
| // either read buffered data, |
| // or read directly for the underlying |
| // buffer, or fetch more buffered data. |
| for n < l && r.state == nil { |
| if r.buffered() != 0 { |
| nn = copy(b[n:], r.data[r.n:]) |
| n += nn |
| r.n += nn |
| } else if l-n > cap(r.data) { |
| nn, r.state = r.r.Read(b[n:]) |
| n += nn |
| } else { |
| r.more() |
| } |
| } |
| if n < l { |
| return n, r.noEOF() |
| } |
| return n, nil |
| } |
| |
| // ReadByte implements `io.ByteReader` |
| func (r *Reader) ReadByte() (byte, error) { |
| for r.buffered() < 1 && r.state == nil { |
| r.more() |
| } |
| if r.buffered() < 1 { |
| return 0, r.err() |
| } |
| b := r.data[r.n] |
| r.n++ |
| return b, nil |
| } |
| |
| // WriteTo implements `io.WriterTo` |
| func (r *Reader) WriteTo(w io.Writer) (int64, error) { |
| var ( |
| i int64 |
| ii int |
| err error |
| ) |
| // first, clear buffer |
| if r.buffered() > 0 { |
| ii, err = w.Write(r.data[r.n:]) |
| i += int64(ii) |
| if err != nil { |
| return i, err |
| } |
| r.data = r.data[0:0] |
| r.n = 0 |
| } |
| for r.state == nil { |
| // here we just do |
| // 1:1 reads and writes |
| r.more() |
| if r.buffered() > 0 { |
| ii, err = w.Write(r.data) |
| i += int64(ii) |
| if err != nil { |
| return i, err |
| } |
| r.data = r.data[0:0] |
| r.n = 0 |
| } |
| } |
| if r.state != io.EOF { |
| return i, r.err() |
| } |
| return i, nil |
| } |
| |
| func min(a int, b int) int { |
| if a < b { |
| return a |
| } |
| return b |
| } |
| |
| func max(a int, b int) int { |
| if a < b { |
| return b |
| } |
| return a |
| } |