avoid data race in worker creation
There is a data race with the waitgroup (wg) object used to synchronize
the workers with the server exit. The workers called wg.Add()
asynchronously and it was possible for the Wait() to get hit before any
of the Add() calls were made in certain conditions. I only ever saw this
sporatically in the travis tests.
This fixes it by making the wg.Add() calls synchronous.
diff --git a/packet-manager.go b/packet-manager.go
index a204567..6d1a8e5 100644
--- a/packet-manager.go
+++ b/packet-manager.go
@@ -61,15 +61,15 @@
// The goal is to process packets in the order they are received as is
// requires by section 7 of the RFC, while maximizing throughput of file
// transfers.
-func (s *packetManager) workerChan(worker func(requestChan)) requestChan {
+func (s *packetManager) workerChan(runWorker func(requestChan)) requestChan {
rwChan := make(chan requestPacket, sftpServerWorkerCount)
for i := 0; i < sftpServerWorkerCount; i++ {
- go worker(rwChan)
+ runWorker(rwChan)
}
cmdChan := make(chan requestPacket)
- go worker(cmdChan)
+ runWorker(cmdChan)
pktChan := make(chan requestPacket, sftpServerWorkerCount)
go func() {
diff --git a/packet-manager_test.go b/packet-manager_test.go
index 39299c6..08df054 100644
--- a/packet-manager_test.go
+++ b/packet-manager_test.go
@@ -108,18 +108,20 @@
pktMgr := newPktMgr(sender)
wg := sync.WaitGroup{}
wg.Add(len(packets))
- worker := func(ch requestChan) {
- for pkt := range ch {
- if _, ok := pkt.(*sshFxpWritePacket); ok {
- // sleep to cause writes to come after close/remove
- time.Sleep(time.Millisecond)
+ runWorker := func(ch requestChan) {
+ go func() {
+ for pkt := range ch {
+ if _, ok := pkt.(*sshFxpWritePacket); ok {
+ // sleep to cause writes to come after close/remove
+ time.Sleep(time.Millisecond)
+ }
+ pktMgr.working.Done()
+ recvChan <- pkt
+ wg.Done()
}
- pktMgr.working.Done()
- recvChan <- pkt
- wg.Done()
- }
+ }()
}
- pktChan := pktMgr.workerChan(worker)
+ pktChan := pktMgr.workerChan(runWorker)
for _, p := range packets {
pktChan <- p
}
diff --git a/request-server.go b/request-server.go
index 7a94801..b51a10b 100644
--- a/request-server.go
+++ b/request-server.go
@@ -82,15 +82,16 @@
// Serve requests for user session
func (rs *RequestServer) Serve() error {
var wg sync.WaitGroup
- wg.Add(1)
- workerFunc := func(ch requestChan) {
+ runWorker := func(ch requestChan) {
wg.Add(1)
- defer wg.Done()
- if err := rs.packetWorker(ch); err != nil {
- rs.conn.Close() // shuts down recvPacket
- }
+ go func() {
+ defer wg.Done()
+ if err := rs.packetWorker(ch); err != nil {
+ rs.conn.Close() // shuts down recvPacket
+ }
+ }()
}
- pktChan := rs.pktMgr.workerChan(workerFunc)
+ pktChan := rs.pktMgr.workerChan(runWorker)
var err error
var pkt requestPacket
@@ -111,7 +112,6 @@
pktChan <- pkt
}
- wg.Done()
close(pktChan) // shuts down sftpServerWorkers
wg.Wait() // wait for all workers to exit
diff --git a/server.go b/server.go
index db3270a..42afc9c 100644
--- a/server.go
+++ b/server.go
@@ -282,15 +282,16 @@
// is stopped.
func (svr *Server) Serve() error {
var wg sync.WaitGroup
- wg.Add(1)
- workerFunc := func(ch requestChan) {
+ runWorker := func(ch requestChan) {
wg.Add(1)
- defer wg.Done()
- if err := svr.sftpServerWorker(ch); err != nil {
- svr.conn.Close() // shuts down recvPacket
- }
+ go func() {
+ defer wg.Done()
+ if err := svr.sftpServerWorker(ch); err != nil {
+ svr.conn.Close() // shuts down recvPacket
+ }
+ }()
}
- pktChan := svr.pktMgr.workerChan(workerFunc)
+ pktChan := svr.pktMgr.workerChan(runWorker)
var err error
var pkt requestPacket
@@ -311,7 +312,6 @@
pktChan <- pkt
}
- wg.Done()
close(pktChan) // shuts down sftpServerWorkers
wg.Wait() // wait for all workers to exit