request-server to use new packet ordering code
The 2 servers now use all the same packet managing code to ensure
ordered processing and reply packets.
diff --git a/request-server.go b/request-server.go
index dd04584..a0877c3 100644
--- a/request-server.go
+++ b/request-server.go
@@ -28,7 +28,6 @@
type RequestServer struct {
serverConn
Handlers Handlers
- pktChan chan requestPacket
pktMgr packetManager
openRequests map[string]Request
openRequestLock sync.RWMutex
@@ -47,7 +46,6 @@
return &RequestServer{
serverConn: svrConn,
Handlers: h,
- pktChan: make(chan requestPacket, sftpServerWorkerCount),
pktMgr: newPktMgr(&svrConn),
openRequests: make(map[string]Request),
}
@@ -84,15 +82,15 @@
// Serve requests for user session
func (rs *RequestServer) Serve() error {
var wg sync.WaitGroup
- wg.Add(sftpServerWorkerCount)
- for i := 0; i < sftpServerWorkerCount; i++ {
- go func() {
- defer wg.Done()
- if err := rs.packetWorker(); err != nil {
- rs.conn.Close() // shuts down recvPacket
- }
- }()
+ wg.Add(1)
+ workerFunc := func(ch requestChan) {
+ wg.Add(1)
+ defer wg.Done()
+ if err := rs.packetWorker(ch); err != nil {
+ rs.conn.Close() // shuts down recvPacket
+ }
}
+ pktChan := rs.pktMgr.workerChan(workerFunc)
var err error
var pkt requestPacket
@@ -103,24 +101,26 @@
if err != nil {
break
}
+
pkt, err = makePacket(rxPacket{fxp(pktType), pktBytes})
if err != nil {
debug("makePacket err: %v", err)
rs.conn.Close() // shuts down recvPacket
break
}
- rs.pktMgr.incomingPacket(pkt)
- rs.pktChan <- pkt
- }
- close(rs.pktChan) // shuts down sftpServerWorkers
- wg.Wait() // wait for all workers to exit
- rs.pktMgr.close() // shuts down packetManager
+ pktChan <- pkt
+ }
+ wg.Done()
+
+ close(pktChan) // shuts down sftpServerWorkers
+ wg.Wait() // wait for all workers to exit
+
return err
}
-func (rs *RequestServer) packetWorker() error {
- for pkt := range rs.pktChan {
+func (rs *RequestServer) packetWorker(pktChan chan requestPacket) error {
+ for pkt := range pktChan {
var rpkt responsePacket
switch pkt := pkt.(type) {
case *sshFxInitPacket: