Move packer ordering code into packet-manager
The worker/packet mangement code needs to be in the packet manager so
the request-server can utilize it as well. This also improves the
encapsulation of the method as it relied on internal data that should be
better isolated inside the file/struct.
diff --git a/packet-manager.go b/packet-manager.go
index bee1246..3f5f0c0 100644
--- a/packet-manager.go
+++ b/packet-manager.go
@@ -33,7 +33,7 @@
sender: sender,
working: &sync.WaitGroup{},
}
- go s.worker()
+ go s.controller()
return s
}
@@ -50,13 +50,58 @@
s.working.Done()
}
-// shut down packetManager worker
+// shut down packetManager controller
func (s packetManager) close() {
+ // pause until current packets are processed
+ s.working.Wait()
close(s.fini)
}
+// Passed a worker function, returns a channel for incoming packets.
+// The goal is to process packets in the order they are received as is
+// requires by section 7 of the RFC, while maximizing throughput of file
+// transfers.
+// XXX incomingChan // newPacketChannel // incomingPktChan
+func (s *packetManager) workerChan(worker func(requestChan)) requestChan {
+
+ rwChan := make(chan requestPacket, sftpServerWorkerCount)
+ for i := 0; i < sftpServerWorkerCount; i++ {
+ go worker(rwChan)
+ }
+
+ cmdChan := make(chan requestPacket)
+ go worker(cmdChan)
+
+ pktChan := make(chan requestPacket, sftpServerWorkerCount)
+ go func() {
+ // start with cmdChan
+ curChan := cmdChan
+ for pkt := range pktChan {
+ // on file open packet, switch to rwChan
+ switch pkt.(type) {
+ case *sshFxpOpenPacket:
+ curChan = rwChan
+ // on file close packet, switch back to cmdChan
+ // after waiting for any reads/writes to finish
+ case *sshFxpClosePacket:
+ // wait for rwChan to finish
+ s.working.Wait()
+ // stop using rwChan
+ curChan = cmdChan
+ }
+ s.incomingPacket(pkt)
+ curChan <- pkt
+ }
+ close(rwChan)
+ close(cmdChan)
+ s.close()
+ }()
+
+ return pktChan
+}
+
// process packets
-func (s *packetManager) worker() {
+func (s *packetManager) controller() {
for {
select {
case pkt := <-s.requests:
diff --git a/packet-manager_test.go b/packet-manager_test.go
index 96ff4b6..39299c6 100644
--- a/packet-manager_test.go
+++ b/packet-manager_test.go
@@ -2,7 +2,9 @@
import (
"encoding"
+ "sync"
"testing"
+ "time"
"github.com/stretchr/testify/assert"
)
@@ -86,3 +88,51 @@
}
s.close()
}
+
+// Test what happens when the pool processes a close packet on a file that it
+// is still reading from.
+func TestCloseOutOfOrder(t *testing.T) {
+ packets := []requestPacket{
+ &sshFxpRemovePacket{ID: 0, Filename: "foo"},
+ &sshFxpOpenPacket{ID: 1},
+ &sshFxpWritePacket{ID: 2, Handle: "foo"},
+ &sshFxpWritePacket{ID: 3, Handle: "foo"},
+ &sshFxpWritePacket{ID: 4, Handle: "foo"},
+ &sshFxpWritePacket{ID: 5, Handle: "foo"},
+ &sshFxpClosePacket{ID: 6, Handle: "foo"},
+ &sshFxpRemovePacket{ID: 7, Filename: "foo"},
+ }
+
+ recvChan := make(chan requestPacket, len(packets)+1)
+ sender := newTestSender()
+ pktMgr := newPktMgr(sender)
+ wg := sync.WaitGroup{}
+ wg.Add(len(packets))
+ worker := func(ch requestChan) {
+ for pkt := range ch {
+ if _, ok := pkt.(*sshFxpWritePacket); ok {
+ // sleep to cause writes to come after close/remove
+ time.Sleep(time.Millisecond)
+ }
+ pktMgr.working.Done()
+ recvChan <- pkt
+ wg.Done()
+ }
+ }
+ pktChan := pktMgr.workerChan(worker)
+ for _, p := range packets {
+ pktChan <- p
+ }
+ wg.Wait()
+ close(recvChan)
+ received := []requestPacket{}
+ for p := range recvChan {
+ received = append(received, p)
+ }
+ if received[len(received)-2].id() != packets[len(packets)-2].id() {
+ t.Fatal("Packets processed out of order1:", received, packets)
+ }
+ if received[len(received)-1].id() != packets[len(packets)-1].id() {
+ t.Fatal("Packets processed out of order2:", received, packets)
+ }
+}
diff --git a/packet-typing.go b/packet-typing.go
index 5e714d1..920851d 100644
--- a/packet-typing.go
+++ b/packet-typing.go
@@ -12,6 +12,8 @@
id() uint32
}
+type requestChan chan requestPacket
+
type responsePacket interface {
encoding.BinaryMarshaler
id() uint32
diff --git a/server.go b/server.go
index a140b27..2bc1561 100644
--- a/server.go
+++ b/server.go
@@ -278,45 +278,6 @@
}
}
-type requestChan chan requestPacket
-
-func (svr *Server) sftpServerWorkers(worker func(requestChan)) requestChan {
-
- rwChan := make(chan requestPacket, sftpServerWorkerCount)
- for i := 0; i < sftpServerWorkerCount; i++ {
- go worker(rwChan)
- }
-
- cmdChan := make(chan requestPacket)
- go worker(cmdChan)
-
- pktChan := make(chan requestPacket, sftpServerWorkerCount)
- go func() {
- // start with cmdChan
- curChan := cmdChan
- for pkt := range pktChan {
- // on file open packet, switch to rwChan
- switch pkt.(type) {
- case *sshFxpOpenPacket:
- curChan = rwChan
- // on file close packet, switch back to cmdChan
- // after waiting for any reads/writes to finish
- case *sshFxpClosePacket:
- // wait for rwChan to finish
- svr.pktMgr.working.Wait()
- // stop using rwChan
- curChan = cmdChan
- }
- svr.pktMgr.incomingPacket(pkt)
- curChan <- pkt
- }
- close(rwChan)
- close(cmdChan)
- }()
-
- return pktChan
-}
-
// Serve serves SFTP connections until the streams stop or the SFTP subsystem
// is stopped.
func (svr *Server) Serve() error {
@@ -329,7 +290,7 @@
svr.conn.Close() // shuts down recvPacket
}
}
- pktChan := svr.sftpServerWorkers(workerFunc)
+ pktChan := svr.pktMgr.workerChan(workerFunc)
var err error
var pkt requestPacket
@@ -352,9 +313,8 @@
}
wg.Done()
- close(pktChan) // shuts down sftpServerWorkers
- wg.Wait() // wait for all workers to exit
- svr.pktMgr.close() // shuts down packetManager
+ close(pktChan) // shuts down sftpServerWorkers
+ wg.Wait() // wait for all workers to exit
// close any still-open files
for handle, file := range svr.openFiles {
diff --git a/server_test.go b/server_test.go
index 804b5d3..721acc6 100644
--- a/server_test.go
+++ b/server_test.go
@@ -4,7 +4,6 @@
"io"
"sync"
"testing"
- "time"
)
func clientServerPair(t *testing.T) (*Client, *Server) {
@@ -67,55 +66,6 @@
}
-// Test what happens when the pool processes a close packet on a file that it
-// is still reading from.
-func TestCloseOutOfOrder(t *testing.T) {
- packets := []requestPacket{
- &sshFxpRemovePacket{ID: 0, Filename: "foo"},
- &sshFxpOpenPacket{ID: 1},
- &sshFxpWritePacket{ID: 2, Handle: "foo"},
- &sshFxpWritePacket{ID: 3, Handle: "foo"},
- &sshFxpWritePacket{ID: 4, Handle: "foo"},
- &sshFxpWritePacket{ID: 5, Handle: "foo"},
- &sshFxpClosePacket{ID: 6, Handle: "foo"},
- &sshFxpRemovePacket{ID: 7, Filename: "foo"},
- }
-
- recvChan := make(chan requestPacket, len(packets)+1)
- sender := newTestSender()
- pm := newPktMgr(sender)
- svr := Server{pktMgr: pm}
- wg := sync.WaitGroup{}
- wg.Add(len(packets))
- worker := func(ch requestChan) {
- for pkt := range ch {
- if _, ok := pkt.(*sshFxpWritePacket); ok {
- // sleep to cause writes to come after close/remove
- time.Sleep(time.Millisecond)
- }
- pm.working.Done()
- recvChan <- pkt
- wg.Done()
- }
- }
- pktChan := svr.sftpServerWorkers(worker)
- for _, p := range packets {
- pktChan <- p
- }
- wg.Wait()
- close(recvChan)
- received := []requestPacket{}
- for p := range recvChan {
- received = append(received, p)
- }
- if received[len(received)-2].id() != packets[len(packets)-2].id() {
- t.Fatal("Packets processed out of order1:", received, packets)
- }
- if received[len(received)-1].id() != packets[len(packets)-1].id() {
- t.Fatal("Packets processed out of order2:", received, packets)
- }
-}
-
// test that server handles concurrent requests correctly
func TestConcurrentRequests(t *testing.T) {
client, server := clientServerPair(t)