blob: e35f772a964800df75c2504a59a4765549915d74 [file] [log] [blame]
package reader
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"os"
"sync"
"github.com/klauspost/compress/zstd"
syncpool "github.com/mostynb/zstdpool-syncpool"
)
// errNotInitialized is the error returned from Read() by a ReedSeeker that
// hasn't yet had Initialize() called.
//
// stylecheck is disabled because the error text starts with a capital letter,
// and changing the text would be an API change.
var errNotInitialized = errors.New("Not yet initialized") // nolint:stylecheck
// Initializable is an interface containing methods to initialize a ReadSeeker.
type Initializable interface {
IsInitialized() bool
Initialize() error
}
// ReadSeeker is an interface used to capture a file reader with seek functionality.
type ReadSeeker interface {
io.Reader
io.Closer
Initializable
SeekOffset(offset int64) error
}
type fileSeeker struct {
reader *bufio.Reader
f *os.File
path string
buffSize int
seekOffset int64
initialized bool
}
// NewFileReadSeeker wraps a buffered file reader with Seeking functionality.
// Notice that Seek calls un-set the reader and require Initialize calls. This
// is to avoid potentially unnecessary disk IO.
func NewFileReadSeeker(path string, buffsize int) ReadSeeker {
return &fileSeeker{
f: nil,
path: path,
buffSize: buffsize,
seekOffset: 0,
initialized: false,
}
}
// Close closes the reader. It still can be reopened with Initialize().
func (fio *fileSeeker) Close() (err error) {
fio.initialized = false
if fio.f != nil {
err = fio.f.Close()
}
fio.f = nil
fio.reader = nil
return err
}
// Read implements io.Reader.
func (fio *fileSeeker) Read(p []byte) (int, error) {
if !fio.IsInitialized() {
return 0, errNotInitialized
}
return fio.reader.Read(p)
}
// Seek is a simplified version of io.Seeker. It only supports offsets from the
// beginning of the file, and it errors lazily at the next Initialize.
func (fio *fileSeeker) SeekOffset(offset int64) error {
fio.seekOffset = offset
fio.initialized = false
fio.reader = nil
return nil
}
// IsInitialized indicates whether this reader is ready. If false, Read calls
// will fail.
func (fio *fileSeeker) IsInitialized() bool {
return fio.initialized
}
// Initialize does the required IO pre-work for Read calls to function.
func (fio *fileSeeker) Initialize() error {
if fio.initialized {
return errors.New("Already initialized")
}
if fio.f == nil {
var err error
fio.f, err = os.Open(fio.path)
if err != nil {
return err
}
}
off, err := fio.f.Seek(fio.seekOffset, io.SeekStart)
if err != nil {
return err
}
if off != fio.seekOffset {
return fmt.Errorf("File seeking ended at %d. Expected %d,", off, fio.seekOffset)
}
if fio.reader == nil {
fio.reader = bufio.NewReaderSize(fio.f, fio.buffSize)
} else {
fio.reader.Reset(fio.f)
}
fio.initialized = true
return nil
}
// The zstd encoder lib will async write to the buffer, so we need
// to lock access to actually check for contents.
type syncedBuffer struct {
mu sync.Mutex
buf *bytes.Buffer
}
func (sb *syncedBuffer) Read(p []byte) (int, error) {
sb.mu.Lock()
defer sb.mu.Unlock()
return sb.buf.Read(p)
}
func (sb *syncedBuffer) Write(p []byte) (int, error) {
sb.mu.Lock()
defer sb.mu.Unlock()
return sb.buf.Write(p)
}
func (sb *syncedBuffer) Len() int {
sb.mu.Lock()
defer sb.mu.Unlock()
return sb.buf.Len()
}
func (sb *syncedBuffer) Reset() {
sb.mu.Lock()
defer sb.mu.Unlock()
sb.buf.Reset()
}
type compressedSeeker struct {
fs ReadSeeker
encdW *syncpool.EncoderWrapper
// This keeps the compressed data
buf *syncedBuffer
}
var encoderInit sync.Once
var encoders *sync.Pool
// NewCompressedFileSeeker creates a ReadSeeker based on a file path.
func NewCompressedFileSeeker(path string, buffsize int) (ReadSeeker, error) {
return NewCompressedSeeker(NewFileReadSeeker(path, buffsize))
}
// NewCompressedSeeker wraps a ReadSeeker to compress its data on the fly.
func NewCompressedSeeker(fs ReadSeeker) (ReadSeeker, error) {
if _, ok := fs.(*compressedSeeker); ok {
return nil, errors.New("trying to double compress files")
}
encoderInit.Do(func() {
encoders = syncpool.NewEncoderPool(zstd.WithEncoderConcurrency(1))
})
buf := bytes.NewBuffer(nil)
sb := &syncedBuffer{buf: buf}
encdIntf := encoders.Get()
encdW, ok := encdIntf.(*syncpool.EncoderWrapper)
if !ok || encdW == nil {
return nil, errors.New("failed creating new encoder")
}
encdW.Reset(sb)
return &compressedSeeker{
fs: fs,
encdW: encdW,
buf: sb,
}, nil
}
func (cfs *compressedSeeker) Read(p []byte) (int, error) {
if !cfs.IsInitialized() {
return 0, errNotInitialized
}
var err error
// Repeatedly encode chunks of input data until there's enough compressed
// data to fill the output buffer. It can't be known ahead of time how much
// uncompressed data will correspond to the desired amount of output
// compressed data, hence the need for a loop.
//
// err will be nil until the loop encounters an error. cfs.encdW will be nil
// when entering the loop if a previous Read call encountered an error or
// reached an EOF, in which case there's no more data to encode.
for cfs.buf.Len() < len(p) && err == nil && cfs.encdW != nil {
var n int
// Read is allowed to use the entirety of p as a scratchpad.
n, err = cfs.fs.Read(p)
// errW must be non-nil if written bytes != n.
_, errW := cfs.encdW.Write(p[:n])
if errW != nil && (err == nil || err == io.EOF) {
err = errW
}
}
if err != nil {
// When the buffer ends (EOF), or in case of an unexpected error,
// compress remaining available bytes. The encoder requires a Close call
// to finish writing compressed data smaller than zstd's window size.
closeErr := cfs.encdW.Close()
if err == io.EOF {
err = closeErr
}
encoders.Put(cfs.encdW)
cfs.encdW = nil
}
n, readErr := cfs.buf.Read(p)
if err == nil {
err = readErr
}
return n, err
}
func (cfs *compressedSeeker) SeekOffset(offset int64) error {
cfs.buf.Reset()
if cfs.encdW == nil {
encdIntf := encoders.Get()
var ok bool
cfs.encdW, ok = encdIntf.(*syncpool.EncoderWrapper)
if !ok || cfs.encdW == nil {
return errors.New("failed to get a new encoder")
}
} else if err := cfs.encdW.Close(); err != nil {
encoders.Put(cfs.encdW)
cfs.encdW = nil
return err
}
cfs.buf.Reset()
cfs.encdW.Reset(cfs.buf)
return cfs.fs.SeekOffset(offset)
}
func (cfs *compressedSeeker) IsInitialized() bool { return cfs.fs.IsInitialized() }
func (cfs *compressedSeeker) Initialize() error { return cfs.fs.Initialize() }
// No need for close to close the encoder - that's handled by Read.
func (cfs *compressedSeeker) Close() error { return cfs.fs.Close() }