| package ioutils |
| |
| import ( |
| "errors" |
| "io" |
| "sync" |
| ) |
| |
| // maxCap is the highest capacity to use in byte slices that buffer data. |
| const maxCap = 1e6 |
| |
| // blockThreshold is the minimum number of bytes in the buffer which will cause |
| // a write to BytesPipe to block when allocating a new slice. |
| const blockThreshold = 1e6 |
| |
| // ErrClosed is returned when Write is called on a closed BytesPipe. |
| var ErrClosed = errors.New("write to closed BytesPipe") |
| |
| // BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue). |
| // All written data may be read at most once. Also, BytesPipe allocates |
| // and releases new byte slices to adjust to current needs, so the buffer |
| // won't be overgrown after peak loads. |
| type BytesPipe struct { |
| mu sync.Mutex |
| wait *sync.Cond |
| buf [][]byte // slice of byte-slices of buffered data |
| lastRead int // index in the first slice to a read point |
| bufLen int // length of data buffered over the slices |
| closeErr error // error to return from next Read. set to nil if not closed. |
| } |
| |
| // NewBytesPipe creates new BytesPipe, initialized by specified slice. |
| // If buf is nil, then it will be initialized with slice which cap is 64. |
| // buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf). |
| func NewBytesPipe(buf []byte) *BytesPipe { |
| if cap(buf) == 0 { |
| buf = make([]byte, 0, 64) |
| } |
| bp := &BytesPipe{ |
| buf: [][]byte{buf[:0]}, |
| } |
| bp.wait = sync.NewCond(&bp.mu) |
| return bp |
| } |
| |
| // Write writes p to BytesPipe. |
| // It can allocate new []byte slices in a process of writing. |
| func (bp *BytesPipe) Write(p []byte) (int, error) { |
| bp.mu.Lock() |
| defer bp.mu.Unlock() |
| written := 0 |
| loop0: |
| for { |
| if bp.closeErr != nil { |
| return written, ErrClosed |
| } |
| // write data to the last buffer |
| b := bp.buf[len(bp.buf)-1] |
| // copy data to the current empty allocated area |
| n := copy(b[len(b):cap(b)], p) |
| // increment buffered data length |
| bp.bufLen += n |
| // include written data in last buffer |
| bp.buf[len(bp.buf)-1] = b[:len(b)+n] |
| |
| written += n |
| |
| // if there was enough room to write all then break |
| if len(p) == n { |
| break |
| } |
| |
| // more data: write to the next slice |
| p = p[n:] |
| |
| // block if too much data is still in the buffer |
| for bp.bufLen >= blockThreshold { |
| bp.wait.Wait() |
| if bp.closeErr != nil { |
| continue loop0 |
| } |
| } |
| |
| // allocate slice that has twice the size of the last unless maximum reached |
| nextCap := 2 * cap(bp.buf[len(bp.buf)-1]) |
| if nextCap > maxCap { |
| nextCap = maxCap |
| } |
| // add new byte slice to the buffers slice and continue writing |
| bp.buf = append(bp.buf, make([]byte, 0, nextCap)) |
| } |
| bp.wait.Broadcast() |
| return written, nil |
| } |
| |
| // CloseWithError causes further reads from a BytesPipe to return immediately. |
| func (bp *BytesPipe) CloseWithError(err error) error { |
| bp.mu.Lock() |
| if err != nil { |
| bp.closeErr = err |
| } else { |
| bp.closeErr = io.EOF |
| } |
| bp.wait.Broadcast() |
| bp.mu.Unlock() |
| return nil |
| } |
| |
| // Close causes further reads from a BytesPipe to return immediately. |
| func (bp *BytesPipe) Close() error { |
| return bp.CloseWithError(nil) |
| } |
| |
| func (bp *BytesPipe) len() int { |
| return bp.bufLen - bp.lastRead |
| } |
| |
| // Read reads bytes from BytesPipe. |
| // Data could be read only once. |
| func (bp *BytesPipe) Read(p []byte) (n int, err error) { |
| bp.mu.Lock() |
| defer bp.mu.Unlock() |
| if bp.len() == 0 { |
| if bp.closeErr != nil { |
| return 0, bp.closeErr |
| } |
| bp.wait.Wait() |
| if bp.len() == 0 && bp.closeErr != nil { |
| return 0, bp.closeErr |
| } |
| } |
| for { |
| read := copy(p, bp.buf[0][bp.lastRead:]) |
| n += read |
| bp.lastRead += read |
| if bp.len() == 0 { |
| // we have read everything. reset to the beginning. |
| bp.lastRead = 0 |
| bp.bufLen -= len(bp.buf[0]) |
| bp.buf[0] = bp.buf[0][:0] |
| break |
| } |
| // break if everything was read |
| if len(p) == read { |
| break |
| } |
| // more buffered data and more asked. read from next slice. |
| p = p[read:] |
| bp.lastRead = 0 |
| bp.bufLen -= len(bp.buf[0]) |
| bp.buf[0] = nil // throw away old slice |
| bp.buf = bp.buf[1:] // switch to next |
| } |
| bp.wait.Broadcast() |
| return |
| } |