Allow to configure maxConcurrentRequests

Change-Id: Ief4563f31e64bac598e572d3da34e8cf3a174bfb
diff --git a/client.go b/client.go
index d0ccf13..2b883ae 100644
--- a/client.go
+++ b/client.go
@@ -45,6 +45,19 @@
 	}
 }
 
+// MaxConcurrentRequestsPerFile sets the maximum concurrent requests allowed for a single file.
+//
+// The default maximum concurrent requests is 64.
+func MaxConcurrentRequestsPerFile(n int) ClientOption {
+	return func(c *Client) error {
+		if n < 1 {
+			return errors.Errorf("n must be greater or equal to 1")
+		}
+		c.maxConcurrentRequests = n
+		return nil
+	}
+}
+
 // NewClient creates a new SFTP client on conn, using zero or more option
 // functions.
 func NewClient(conn *ssh.Client, opts ...ClientOption) (*Client, error) {
@@ -79,7 +92,8 @@
 			},
 			inflight: make(map[uint32]chan<- result),
 		},
-		maxPacket: 1 << 15,
+		maxPacket:             1 << 15,
+		maxConcurrentRequests: 64,
 	}
 	if err := sftp.applyOptions(opts...); err != nil {
 		wr.Close()
@@ -106,8 +120,9 @@
 type Client struct {
 	clientConn
 
-	maxPacket int // max packet size read or written.
-	nextid    uint32
+	maxPacket             int // max packet size read or written.
+	nextid                uint32
+	maxConcurrentRequests int
 }
 
 // Create creates the named file mode 0666 (before umask), truncating it if
@@ -659,8 +674,6 @@
 	return f.path
 }
 
-const maxConcurrentRequests = 64
-
 // Read reads up to len(b) bytes from the File. It returns the number of bytes
 // read and an error, if any. Read follows io.Reader semantics, so when Read
 // encounters an error or EOF condition after successfully reading n > 0 bytes,
@@ -675,7 +688,7 @@
 	offset := f.offset
 	// maxConcurrentRequests buffer to deal with broadcastErr() floods
 	// also must have a buffer of max value of (desiredInFlight - inFlight)
-	ch := make(chan result, maxConcurrentRequests+1)
+	ch := make(chan result, f.c.maxConcurrentRequests+1)
 	type inflightRead struct {
 		b      []byte
 		offset uint64
@@ -740,7 +753,7 @@
 			if n < len(req.b) {
 				sendReq(req.b[l:], req.offset+uint64(l))
 			}
-			if desiredInFlight < maxConcurrentRequests {
+			if desiredInFlight < f.c.maxConcurrentRequests {
 				desiredInFlight++
 			}
 		default:
@@ -771,7 +784,7 @@
 	writeOffset := offset
 	fileSize := uint64(fi.Size())
 	// see comment on same line in Read() above
-	ch := make(chan result, maxConcurrentRequests+1)
+	ch := make(chan result, f.c.maxConcurrentRequests+1)
 	type inflightRead struct {
 		b      []byte
 		offset uint64
@@ -851,7 +864,7 @@
 				switch {
 				case offset > fileSize:
 					desiredInFlight = 1
-				case desiredInFlight < maxConcurrentRequests:
+				case desiredInFlight < f.c.maxConcurrentRequests:
 					desiredInFlight++
 				}
 				writeOffset += uint64(nbytes)
@@ -914,7 +927,7 @@
 	desiredInFlight := 1
 	offset := f.offset
 	// see comment on same line in Read() above
-	ch := make(chan result, maxConcurrentRequests+1)
+	ch := make(chan result, f.c.maxConcurrentRequests+1)
 	var firstErr error
 	written := len(b)
 	for len(b) > 0 || inFlight > 0 {
@@ -950,7 +963,7 @@
 				firstErr = err
 				break
 			}
-			if desiredInFlight < maxConcurrentRequests {
+			if desiredInFlight < f.c.maxConcurrentRequests {
 				desiredInFlight++
 			}
 		default:
@@ -975,7 +988,7 @@
 	desiredInFlight := 1
 	offset := f.offset
 	// see comment on same line in Read() above
-	ch := make(chan result, maxConcurrentRequests+1)
+	ch := make(chan result, f.c.maxConcurrentRequests+1)
 	var firstErr error
 	read := int64(0)
 	b := make([]byte, f.c.maxPacket)
@@ -1014,7 +1027,7 @@
 				firstErr = err
 				break
 			}
-			if desiredInFlight < maxConcurrentRequests {
+			if desiredInFlight < f.c.maxConcurrentRequests {
 				desiredInFlight++
 			}
 		default: