blob: a1d7612fd31fabc84a03e9675e09b053d42beca8 [file] [log] [blame]
package client
import (
"errors"
"io"
"time"
)
var ErrTimeout = errors.New("timeout")
// timeoutReader wraps an io.Reader and times out if the read rate is lower
// than chunkSize per second
// TODO: use gracePeriod
type timeoutReader struct {
r io.Reader
gracePeriod time.Duration
chunkSize int
}
const (
defaultGracePeriod = 4 * time.Second
defaultChunkSize = 8 * 1024
)
// newTimeoutReader returns a timeoutReader with default gracePeriod and chunkSize
func newTimeoutReader(r io.Reader) *timeoutReader {
return &timeoutReader{r, defaultGracePeriod, defaultChunkSize}
}
// readResult represents the return value of a read
type readResult struct {
n int
err error
}
// Read reads from t.r, timing out if the read rate is lower than t.chunkSize per second
func (t *timeoutReader) Read(p []byte) (int, error) {
if len(p) < t.chunkSize {
timeout := (time.Duration(len(p)) * time.Second) / time.Duration(t.chunkSize)
return t.readWithTimeout(p, timeout)
}
var pos int
for {
size := t.chunkSize
if size > len(p)-pos {
size = len(p) - pos
}
m, err := t.readWithTimeout(p[pos:size], time.Second)
pos += m
if pos == len(p) || err != nil {
return pos, err
}
}
}
func (t *timeoutReader) readWithTimeout(p []byte, timeout time.Duration) (int, error) {
done := make(chan *readResult)
go func() {
res := &readResult{}
res.n, res.err = t.r.Read(p)
done <- res
}()
select {
case res := <-done:
return res.n, res.err
case <-time.After(timeout):
return 0, ErrTimeout
}
}