Allow to configure maxConcurrentRequests
Change-Id: Ief4563f31e64bac598e572d3da34e8cf3a174bfb
diff --git a/client.go b/client.go
index d0ccf13..2b883ae 100644
--- a/client.go
+++ b/client.go
@@ -45,6 +45,19 @@
}
}
+// MaxConcurrentRequestsPerFile sets the maximum concurrent requests allowed for a single file.
+//
+// The default maximum concurrent requests is 64.
+func MaxConcurrentRequestsPerFile(n int) ClientOption {
+ return func(c *Client) error {
+ if n < 1 {
+ return errors.Errorf("n must be greater or equal to 1")
+ }
+ c.maxConcurrentRequests = n
+ return nil
+ }
+}
+
// NewClient creates a new SFTP client on conn, using zero or more option
// functions.
func NewClient(conn *ssh.Client, opts ...ClientOption) (*Client, error) {
@@ -79,7 +92,8 @@
},
inflight: make(map[uint32]chan<- result),
},
- maxPacket: 1 << 15,
+ maxPacket: 1 << 15,
+ maxConcurrentRequests: 64,
}
if err := sftp.applyOptions(opts...); err != nil {
wr.Close()
@@ -106,8 +120,9 @@
type Client struct {
clientConn
- maxPacket int // max packet size read or written.
- nextid uint32
+ maxPacket int // max packet size read or written.
+ nextid uint32
+ maxConcurrentRequests int
}
// Create creates the named file mode 0666 (before umask), truncating it if
@@ -659,8 +674,6 @@
return f.path
}
-const maxConcurrentRequests = 64
-
// Read reads up to len(b) bytes from the File. It returns the number of bytes
// read and an error, if any. Read follows io.Reader semantics, so when Read
// encounters an error or EOF condition after successfully reading n > 0 bytes,
@@ -675,7 +688,7 @@
offset := f.offset
// maxConcurrentRequests buffer to deal with broadcastErr() floods
// also must have a buffer of max value of (desiredInFlight - inFlight)
- ch := make(chan result, maxConcurrentRequests+1)
+ ch := make(chan result, f.c.maxConcurrentRequests+1)
type inflightRead struct {
b []byte
offset uint64
@@ -740,7 +753,7 @@
if n < len(req.b) {
sendReq(req.b[l:], req.offset+uint64(l))
}
- if desiredInFlight < maxConcurrentRequests {
+ if desiredInFlight < f.c.maxConcurrentRequests {
desiredInFlight++
}
default:
@@ -771,7 +784,7 @@
writeOffset := offset
fileSize := uint64(fi.Size())
// see comment on same line in Read() above
- ch := make(chan result, maxConcurrentRequests+1)
+ ch := make(chan result, f.c.maxConcurrentRequests+1)
type inflightRead struct {
b []byte
offset uint64
@@ -851,7 +864,7 @@
switch {
case offset > fileSize:
desiredInFlight = 1
- case desiredInFlight < maxConcurrentRequests:
+ case desiredInFlight < f.c.maxConcurrentRequests:
desiredInFlight++
}
writeOffset += uint64(nbytes)
@@ -914,7 +927,7 @@
desiredInFlight := 1
offset := f.offset
// see comment on same line in Read() above
- ch := make(chan result, maxConcurrentRequests+1)
+ ch := make(chan result, f.c.maxConcurrentRequests+1)
var firstErr error
written := len(b)
for len(b) > 0 || inFlight > 0 {
@@ -950,7 +963,7 @@
firstErr = err
break
}
- if desiredInFlight < maxConcurrentRequests {
+ if desiredInFlight < f.c.maxConcurrentRequests {
desiredInFlight++
}
default:
@@ -975,7 +988,7 @@
desiredInFlight := 1
offset := f.offset
// see comment on same line in Read() above
- ch := make(chan result, maxConcurrentRequests+1)
+ ch := make(chan result, f.c.maxConcurrentRequests+1)
var firstErr error
read := int64(0)
b := make([]byte, f.c.maxPacket)
@@ -1014,7 +1027,7 @@
firstErr = err
break
}
- if desiredInFlight < maxConcurrentRequests {
+ if desiredInFlight < f.c.maxConcurrentRequests {
desiredInFlight++
}
default: