Move packer ordering code into packet-manager

The worker/packet mangement code needs to be in the packet manager so
the request-server can utilize it as well. This also improves the
encapsulation of the method as it relied on internal data that should be
better isolated inside the file/struct.
diff --git a/packet-manager.go b/packet-manager.go
index bee1246..3f5f0c0 100644
--- a/packet-manager.go
+++ b/packet-manager.go
@@ -33,7 +33,7 @@
 		sender:    sender,
 		working:   &sync.WaitGroup{},
 	}
-	go s.worker()
+	go s.controller()
 	return s
 }
 
@@ -50,13 +50,58 @@
 	s.working.Done()
 }
 
-// shut down packetManager worker
+// shut down packetManager controller
 func (s packetManager) close() {
+	// pause until current packets are processed
+	s.working.Wait()
 	close(s.fini)
 }
 
+// Passed a worker function, returns a channel for incoming packets.
+// The goal is to process packets in the order they are received as is
+// requires by section 7 of the RFC, while maximizing throughput of file
+// transfers.
+// XXX incomingChan // newPacketChannel // incomingPktChan
+func (s *packetManager) workerChan(worker func(requestChan)) requestChan {
+
+	rwChan := make(chan requestPacket, sftpServerWorkerCount)
+	for i := 0; i < sftpServerWorkerCount; i++ {
+		go worker(rwChan)
+	}
+
+	cmdChan := make(chan requestPacket)
+	go worker(cmdChan)
+
+	pktChan := make(chan requestPacket, sftpServerWorkerCount)
+	go func() {
+		// start with cmdChan
+		curChan := cmdChan
+		for pkt := range pktChan {
+			// on file open packet, switch to rwChan
+			switch pkt.(type) {
+			case *sshFxpOpenPacket:
+				curChan = rwChan
+			// on file close packet, switch back to cmdChan
+			// after waiting for any reads/writes to finish
+			case *sshFxpClosePacket:
+				// wait for rwChan to finish
+				s.working.Wait()
+				// stop using rwChan
+				curChan = cmdChan
+			}
+			s.incomingPacket(pkt)
+			curChan <- pkt
+		}
+		close(rwChan)
+		close(cmdChan)
+		s.close()
+	}()
+
+	return pktChan
+}
+
 // process packets
-func (s *packetManager) worker() {
+func (s *packetManager) controller() {
 	for {
 		select {
 		case pkt := <-s.requests:
diff --git a/packet-manager_test.go b/packet-manager_test.go
index 96ff4b6..39299c6 100644
--- a/packet-manager_test.go
+++ b/packet-manager_test.go
@@ -2,7 +2,9 @@
 
 import (
 	"encoding"
+	"sync"
 	"testing"
+	"time"
 
 	"github.com/stretchr/testify/assert"
 )
@@ -86,3 +88,51 @@
 	}
 	s.close()
 }
+
+// Test what happens when the pool processes a close packet on a file that it
+// is still reading from.
+func TestCloseOutOfOrder(t *testing.T) {
+	packets := []requestPacket{
+		&sshFxpRemovePacket{ID: 0, Filename: "foo"},
+		&sshFxpOpenPacket{ID: 1},
+		&sshFxpWritePacket{ID: 2, Handle: "foo"},
+		&sshFxpWritePacket{ID: 3, Handle: "foo"},
+		&sshFxpWritePacket{ID: 4, Handle: "foo"},
+		&sshFxpWritePacket{ID: 5, Handle: "foo"},
+		&sshFxpClosePacket{ID: 6, Handle: "foo"},
+		&sshFxpRemovePacket{ID: 7, Filename: "foo"},
+	}
+
+	recvChan := make(chan requestPacket, len(packets)+1)
+	sender := newTestSender()
+	pktMgr := newPktMgr(sender)
+	wg := sync.WaitGroup{}
+	wg.Add(len(packets))
+	worker := func(ch requestChan) {
+		for pkt := range ch {
+			if _, ok := pkt.(*sshFxpWritePacket); ok {
+				// sleep to cause writes to come after close/remove
+				time.Sleep(time.Millisecond)
+			}
+			pktMgr.working.Done()
+			recvChan <- pkt
+			wg.Done()
+		}
+	}
+	pktChan := pktMgr.workerChan(worker)
+	for _, p := range packets {
+		pktChan <- p
+	}
+	wg.Wait()
+	close(recvChan)
+	received := []requestPacket{}
+	for p := range recvChan {
+		received = append(received, p)
+	}
+	if received[len(received)-2].id() != packets[len(packets)-2].id() {
+		t.Fatal("Packets processed out of order1:", received, packets)
+	}
+	if received[len(received)-1].id() != packets[len(packets)-1].id() {
+		t.Fatal("Packets processed out of order2:", received, packets)
+	}
+}
diff --git a/packet-typing.go b/packet-typing.go
index 5e714d1..920851d 100644
--- a/packet-typing.go
+++ b/packet-typing.go
@@ -12,6 +12,8 @@
 	id() uint32
 }
 
+type requestChan chan requestPacket
+
 type responsePacket interface {
 	encoding.BinaryMarshaler
 	id() uint32
diff --git a/server.go b/server.go
index a140b27..2bc1561 100644
--- a/server.go
+++ b/server.go
@@ -278,45 +278,6 @@
 	}
 }
 
-type requestChan chan requestPacket
-
-func (svr *Server) sftpServerWorkers(worker func(requestChan)) requestChan {
-
-	rwChan := make(chan requestPacket, sftpServerWorkerCount)
-	for i := 0; i < sftpServerWorkerCount; i++ {
-		go worker(rwChan)
-	}
-
-	cmdChan := make(chan requestPacket)
-	go worker(cmdChan)
-
-	pktChan := make(chan requestPacket, sftpServerWorkerCount)
-	go func() {
-		// start with cmdChan
-		curChan := cmdChan
-		for pkt := range pktChan {
-			// on file open packet, switch to rwChan
-			switch pkt.(type) {
-			case *sshFxpOpenPacket:
-				curChan = rwChan
-			// on file close packet, switch back to cmdChan
-			// after waiting for any reads/writes to finish
-			case *sshFxpClosePacket:
-				// wait for rwChan to finish
-				svr.pktMgr.working.Wait()
-				// stop using rwChan
-				curChan = cmdChan
-			}
-			svr.pktMgr.incomingPacket(pkt)
-			curChan <- pkt
-		}
-		close(rwChan)
-		close(cmdChan)
-	}()
-
-	return pktChan
-}
-
 // Serve serves SFTP connections until the streams stop or the SFTP subsystem
 // is stopped.
 func (svr *Server) Serve() error {
@@ -329,7 +290,7 @@
 			svr.conn.Close() // shuts down recvPacket
 		}
 	}
-	pktChan := svr.sftpServerWorkers(workerFunc)
+	pktChan := svr.pktMgr.workerChan(workerFunc)
 
 	var err error
 	var pkt requestPacket
@@ -352,9 +313,8 @@
 	}
 	wg.Done()
 
-	close(pktChan)     // shuts down sftpServerWorkers
-	wg.Wait()          // wait for all workers to exit
-	svr.pktMgr.close() // shuts down packetManager
+	close(pktChan) // shuts down sftpServerWorkers
+	wg.Wait()      // wait for all workers to exit
 
 	// close any still-open files
 	for handle, file := range svr.openFiles {
diff --git a/server_test.go b/server_test.go
index 804b5d3..721acc6 100644
--- a/server_test.go
+++ b/server_test.go
@@ -4,7 +4,6 @@
 	"io"
 	"sync"
 	"testing"
-	"time"
 )
 
 func clientServerPair(t *testing.T) (*Client, *Server) {
@@ -67,55 +66,6 @@
 
 }
 
-// Test what happens when the pool processes a close packet on a file that it
-// is still reading from.
-func TestCloseOutOfOrder(t *testing.T) {
-	packets := []requestPacket{
-		&sshFxpRemovePacket{ID: 0, Filename: "foo"},
-		&sshFxpOpenPacket{ID: 1},
-		&sshFxpWritePacket{ID: 2, Handle: "foo"},
-		&sshFxpWritePacket{ID: 3, Handle: "foo"},
-		&sshFxpWritePacket{ID: 4, Handle: "foo"},
-		&sshFxpWritePacket{ID: 5, Handle: "foo"},
-		&sshFxpClosePacket{ID: 6, Handle: "foo"},
-		&sshFxpRemovePacket{ID: 7, Filename: "foo"},
-	}
-
-	recvChan := make(chan requestPacket, len(packets)+1)
-	sender := newTestSender()
-	pm := newPktMgr(sender)
-	svr := Server{pktMgr: pm}
-	wg := sync.WaitGroup{}
-	wg.Add(len(packets))
-	worker := func(ch requestChan) {
-		for pkt := range ch {
-			if _, ok := pkt.(*sshFxpWritePacket); ok {
-				// sleep to cause writes to come after close/remove
-				time.Sleep(time.Millisecond)
-			}
-			pm.working.Done()
-			recvChan <- pkt
-			wg.Done()
-		}
-	}
-	pktChan := svr.sftpServerWorkers(worker)
-	for _, p := range packets {
-		pktChan <- p
-	}
-	wg.Wait()
-	close(recvChan)
-	received := []requestPacket{}
-	for p := range recvChan {
-		received = append(received, p)
-	}
-	if received[len(received)-2].id() != packets[len(packets)-2].id() {
-		t.Fatal("Packets processed out of order1:", received, packets)
-	}
-	if received[len(received)-1].id() != packets[len(packets)-1].id() {
-		t.Fatal("Packets processed out of order2:", received, packets)
-	}
-}
-
 // test that server handles concurrent requests correctly
 func TestConcurrentRequests(t *testing.T) {
 	client, server := clientServerPair(t)