Allow to configure maxConcurrentRequests Change-Id: Ief4563f31e64bac598e572d3da34e8cf3a174bfb
diff --git a/client.go b/client.go index d0ccf13..2b883ae 100644 --- a/client.go +++ b/client.go
@@ -45,6 +45,19 @@ } } +// MaxConcurrentRequestsPerFile sets the maximum concurrent requests allowed for a single file. +// +// The default maximum concurrent requests is 64. +func MaxConcurrentRequestsPerFile(n int) ClientOption { + return func(c *Client) error { + if n < 1 { + return errors.Errorf("n must be greater or equal to 1") + } + c.maxConcurrentRequests = n + return nil + } +} + // NewClient creates a new SFTP client on conn, using zero or more option // functions. func NewClient(conn *ssh.Client, opts ...ClientOption) (*Client, error) { @@ -79,7 +92,8 @@ }, inflight: make(map[uint32]chan<- result), }, - maxPacket: 1 << 15, + maxPacket: 1 << 15, + maxConcurrentRequests: 64, } if err := sftp.applyOptions(opts...); err != nil { wr.Close() @@ -106,8 +120,9 @@ type Client struct { clientConn - maxPacket int // max packet size read or written. - nextid uint32 + maxPacket int // max packet size read or written. + nextid uint32 + maxConcurrentRequests int } // Create creates the named file mode 0666 (before umask), truncating it if @@ -659,8 +674,6 @@ return f.path } -const maxConcurrentRequests = 64 - // Read reads up to len(b) bytes from the File. It returns the number of bytes // read and an error, if any. Read follows io.Reader semantics, so when Read // encounters an error or EOF condition after successfully reading n > 0 bytes, @@ -675,7 +688,7 @@ offset := f.offset // maxConcurrentRequests buffer to deal with broadcastErr() floods // also must have a buffer of max value of (desiredInFlight - inFlight) - ch := make(chan result, maxConcurrentRequests+1) + ch := make(chan result, f.c.maxConcurrentRequests+1) type inflightRead struct { b []byte offset uint64 @@ -740,7 +753,7 @@ if n < len(req.b) { sendReq(req.b[l:], req.offset+uint64(l)) } - if desiredInFlight < maxConcurrentRequests { + if desiredInFlight < f.c.maxConcurrentRequests { desiredInFlight++ } default: @@ -771,7 +784,7 @@ writeOffset := offset fileSize := uint64(fi.Size()) // see comment on same line in Read() above - ch := make(chan result, maxConcurrentRequests+1) + ch := make(chan result, f.c.maxConcurrentRequests+1) type inflightRead struct { b []byte offset uint64 @@ -851,7 +864,7 @@ switch { case offset > fileSize: desiredInFlight = 1 - case desiredInFlight < maxConcurrentRequests: + case desiredInFlight < f.c.maxConcurrentRequests: desiredInFlight++ } writeOffset += uint64(nbytes) @@ -914,7 +927,7 @@ desiredInFlight := 1 offset := f.offset // see comment on same line in Read() above - ch := make(chan result, maxConcurrentRequests+1) + ch := make(chan result, f.c.maxConcurrentRequests+1) var firstErr error written := len(b) for len(b) > 0 || inFlight > 0 { @@ -950,7 +963,7 @@ firstErr = err break } - if desiredInFlight < maxConcurrentRequests { + if desiredInFlight < f.c.maxConcurrentRequests { desiredInFlight++ } default: @@ -975,7 +988,7 @@ desiredInFlight := 1 offset := f.offset // see comment on same line in Read() above - ch := make(chan result, maxConcurrentRequests+1) + ch := make(chan result, f.c.maxConcurrentRequests+1) var firstErr error read := int64(0) b := make([]byte, f.c.maxPacket) @@ -1014,7 +1027,7 @@ firstErr = err break } - if desiredInFlight < maxConcurrentRequests { + if desiredInFlight < f.c.maxConcurrentRequests { desiredInFlight++ } default: