Merge pull request #266 from pkg/issue265
fix race w/ open packet and stat
diff --git a/packet-manager.go b/packet-manager.go
index c53ed56..d47713b 100644
--- a/packet-manager.go
+++ b/packet-manager.go
@@ -105,33 +105,32 @@
func (s *packetManager) workerChan(runWorker func(chan orderedRequest),
) chan orderedRequest {
+ // multiple workers for faster read/writes
rwChan := make(chan orderedRequest, SftpServerWorkerCount)
for i := 0; i < SftpServerWorkerCount; i++ {
runWorker(rwChan)
}
+ // single worker to enforce sequential processing of everything else
cmdChan := make(chan orderedRequest)
runWorker(cmdChan)
pktChan := make(chan orderedRequest, SftpServerWorkerCount)
go func() {
- // start with cmdChan
- curChan := cmdChan
for pkt := range pktChan {
- // on file open packet, switch to rwChan
switch pkt.requestPacket.(type) {
- case *sshFxpOpenPacket:
- curChan = rwChan
- // on file close packet, switch back to cmdChan
- // after waiting for any reads/writes to finish
+ case *sshFxpReadPacket, *sshFxpWritePacket:
+ s.incomingPacket(pkt)
+ rwChan <- pkt
+ continue
case *sshFxpClosePacket:
- // wait for rwChan to finish
+ // wait for reads/writes to finish when file is closed
+ // incomingPacket() call must occur after this
s.working.Wait()
- // stop using rwChan
- curChan = cmdChan
}
s.incomingPacket(pkt)
- curChan <- pkt
+ // all non-RW use sequential cmdChan
+ cmdChan <- pkt
}
close(rwChan)
close(cmdChan)
diff --git a/server_test.go b/server_test.go
index 49e26b7..5568347 100644
--- a/server_test.go
+++ b/server_test.go
@@ -3,6 +3,7 @@
import (
"io"
"os"
+ "path"
"regexp"
"sync"
"syscall"
@@ -278,3 +279,51 @@
assert.Equal(t, tc.pkt, statusFromError(tc.pkt, tc.err))
}
}
+
+// This was written to test a race b/w open immediately followed by a stat.
+// Previous to this the Open would trigger the use of a worker pool, then the
+// stat packet would come in an hit the pool and return faster than the open
+// (returning a file-not-found error).
+// The below by itself wouldn't trigger the race however, I needed to add a
+// small sleep in the openpacket code to trigger the issue. I wanted to add a
+// way to inject that in the code but right now there is no good place for it.
+// I'm thinking after I convert the server into a request-server backend I
+// might be able to do something with the runWorker method passed into the
+// packet manager. But with the 2 implementations fo the server it just doesn't
+// fit well right now.
+func TestOpenStatRace(t *testing.T) {
+ client, server := clientServerPair(t)
+ defer client.Close()
+ defer server.Close()
+
+ // openpacket finishes to fast to trigger race in tests
+ // need to add a small sleep on server to openpackets somehow
+ tmppath := path.Join(os.TempDir(), "stat_race")
+ pflags := flags(os.O_RDWR | os.O_CREATE | os.O_TRUNC)
+ ch := make(chan result, 3)
+ id1 := client.nextID()
+ client.dispatchRequest(ch, sshFxpOpenPacket{
+ ID: id1,
+ Path: tmppath,
+ Pflags: pflags,
+ })
+ id2 := client.nextID()
+ client.dispatchRequest(ch, sshFxpLstatPacket{
+ ID: id2,
+ Path: tmppath,
+ })
+ testreply := func(id uint32, ch chan result) {
+ r := <-ch
+ switch r.typ {
+ case ssh_FXP_ATTRS, ssh_FXP_HANDLE: // ignore
+ case ssh_FXP_STATUS:
+ err := normaliseError(unmarshalStatus(id, r.data))
+ assert.NoError(t, err, "race hit, stat before open")
+ default:
+ assert.Fail(t, "Unexpected type")
+ }
+ }
+ testreply(id1, ch)
+ testreply(id2, ch)
+ os.Remove(tmppath)
+}