Merge pull request #1283 from phausler/swift-4.0-branch-data_sync

diff --git a/Foundation/URLSession/URLSession.swift b/Foundation/URLSession/URLSession.swift
index 9dbb84c..bcf8589 100644
--- a/Foundation/URLSession/URLSession.swift
+++ b/Foundation/URLSession/URLSession.swift
@@ -172,11 +172,13 @@
 import Dispatch
 
 
+fileprivate let globalVarSyncQ = DispatchQueue(label: "org.swift.Foundation.URLSession.GlobalVarSyncQ")
 fileprivate var sessionCounter = Int32(0)
 fileprivate func nextSessionIdentifier() -> Int32 {
-    //TODO: find an alternative for OSAtomicIncrement32Barrier() on Linux
-    sessionCounter += 1
-    return sessionCounter
+    return globalVarSyncQ.sync {
+        sessionCounter += 1
+        return sessionCounter
+    }
 }
 public let NSURLSessionTransferSizeUnknown: Int64 = -1
 
@@ -397,9 +399,11 @@
 
 fileprivate extension URLSession {
     func createNextTaskIdentifier() -> Int {
-        let i = nextTaskIdentifier
-        nextTaskIdentifier += 1
-        return i
+        return workQueue.sync {
+            let i = nextTaskIdentifier
+            nextTaskIdentifier += 1
+            return i
+        }
     }
 }
 
diff --git a/Foundation/URLSession/URLSessionTask.swift b/Foundation/URLSession/URLSessionTask.swift
index 7dcc66e..92c9cee 100644
--- a/Foundation/URLSession/URLSessionTask.swift
+++ b/Foundation/URLSession/URLSessionTask.swift
@@ -30,13 +30,10 @@
     internal var session: URLSessionProtocol! //change to nil when task completes
     internal let body: _Body
     fileprivate var _protocol: URLProtocol? = nil
-
+    private let syncQ = DispatchQueue(label: "org.swift.URLSessionTask.SyncQ")
+    
     /// All operations must run on this queue.
     internal let workQueue: DispatchQueue 
-    /// Using dispatch semaphore to make public attributes thread safe.
-    /// A semaphore is a simpler option against the usage of concurrent queue
-    /// as the critical sections are very short.
-    fileprivate let semaphore = DispatchSemaphore(value: 1)    
     
     public override init() {
         // Darwin Foundation oddly allows calling this initializer, even though
@@ -62,7 +59,8 @@
     }
     internal init(session: URLSession, request: URLRequest, taskIdentifier: Int, body: _Body) {
         self.session = session
-        self.workQueue = session.workQueue
+        /* make sure we're actually having a serial queue as it's used for synchronization */
+        self.workQueue = DispatchQueue.init(label: "org.swift.URLSessionTask.WorkQueue", target: session.workQueue)
         self.taskIdentifier = taskIdentifier
         self.originalRequest = request
         self.body = body
@@ -108,31 +106,19 @@
     /// May differ from originalRequest due to http server redirection
     /*@NSCopying*/ open internal(set) var currentRequest: URLRequest? {
         get {
-            semaphore.wait()
-            defer {
-                semaphore.signal()
-            }
-            return self._currentRequest
+            return self.syncQ.sync { return self._currentRequest }
         }
         set {
-            semaphore.wait()
-            self._currentRequest = newValue
-            semaphore.signal()
+            self.syncQ.sync { self._currentRequest = newValue }
         }
     }
     fileprivate var _currentRequest: URLRequest? = nil
     /*@NSCopying*/ open internal(set) var response: URLResponse? {
         get {
-            semaphore.wait()
-            defer {
-                semaphore.signal()
-            }
-            return self._response
+            return self.syncQ.sync { return self._response }
         }
         set {
-            semaphore.wait()
-            self._response = newValue
-            semaphore.signal()
+            self.syncQ.sync { self._response = newValue }
         }
     }
     fileprivate var _response: URLResponse? = nil
@@ -145,16 +131,10 @@
     /// Number of body bytes already received
     open internal(set) var countOfBytesReceived: Int64 {
         get {
-            semaphore.wait()
-            defer {
-                semaphore.signal()
-            }
-            return self._countOfBytesReceived
+            return self.syncQ.sync { return self._countOfBytesReceived }
         }
         set {
-            semaphore.wait()
-            self._countOfBytesReceived = newValue
-            semaphore.signal()
+            self.syncQ.sync { self._countOfBytesReceived = newValue }
         }
     }
     fileprivate var _countOfBytesReceived: Int64 = 0
@@ -162,16 +142,10 @@
     /// Number of body bytes already sent */
     open internal(set) var countOfBytesSent: Int64 {
         get {
-            semaphore.wait()
-            defer {
-                semaphore.signal()
-            }
-            return self._countOfBytesSent
+            return self.syncQ.sync { return self._countOfBytesSent }
         }
         set {
-            semaphore.wait()
-            self._countOfBytesSent = newValue
-            semaphore.signal()
+            self.syncQ.sync { self._countOfBytesSent = newValue }
         }
     }
     
@@ -211,16 +185,10 @@
      */
     open var state: URLSessionTask.State {
         get {
-            semaphore.wait()
-            defer {
-                semaphore.signal()
-            }
-            return self._state
+            return self.syncQ.sync { self._state }
         }
         set {
-            semaphore.wait()
-            self._state = newValue
-            semaphore.signal()
+            self.syncQ.sync { self._state = newValue }
         }
     }
     fileprivate var _state: URLSessionTask.State = .suspended
@@ -315,16 +283,10 @@
     /// URLSessionTask.highPriority, but use is not restricted to these.
     open var priority: Float {
         get {
-            semaphore.wait()
-            defer {
-                semaphore.signal()
-            }
-            return self._priority
+            return self.workQueue.sync { return self._priority }
         }
         set {
-            semaphore.wait()
-            self._priority = newValue
-            semaphore.signal()
+            self.workQueue.sync { self._priority = newValue }
         }
     }
     fileprivate var _priority: Float = URLSessionTask.defaultPriority
@@ -569,7 +531,9 @@
             session.delegateQueue.addOperation {
                 delegate.urlSession(session, task: task, didCompleteWithError: nil)
                 task.state = .completed
-                session.taskRegistry.remove(task)
+                task.workQueue.async {
+                    session.taskRegistry.remove(task)
+                }
             }
         case .noDelegate:
             task.state = .completed
@@ -625,7 +589,9 @@
             session.delegateQueue.addOperation {
                 delegate.urlSession(session, task: task, didCompleteWithError: error as Error)
                 task.state = .completed
-                session.taskRegistry.remove(task)
+                task.workQueue.async {
+                    session.taskRegistry.remove(task)
+                }
             }
         case .noDelegate:
             task.state = .completed
@@ -634,7 +600,9 @@
             session.delegateQueue.addOperation {
                 completion(nil, nil, error)
                 task.state = .completed
-                session.taskRegistry.remove(task)
+                task.workQueue.async {
+                    session.taskRegistry.remove(task)
+                }
             }
         case .downloadCompletionHandler(let completion):
             session.delegateQueue.addOperation {
diff --git a/Foundation/URLSession/http/MultiHandle.swift b/Foundation/URLSession/http/MultiHandle.swift
index bb2acaf..506c9a9 100644
--- a/Foundation/URLSession/http/MultiHandle.swift
+++ b/Foundation/URLSession/http/MultiHandle.swift
@@ -39,11 +39,7 @@
         let group = DispatchGroup()
         fileprivate var easyHandles: [_EasyHandle] = []
         fileprivate var timeoutSource: _TimeoutSource? = nil
-
-        //SR-4567: we need to synchronize the register/unregister commands to the epoll machinery in libdispatch
-        fileprivate let commandQueue: DispatchQueue = DispatchQueue(label: "Register-unregister synchronization")
-        fileprivate var cancelInProgress: DispatchSemaphore? = nil
-
+        
         init(configuration: URLSession._Configuration, workQueue: DispatchQueue) {
             queue = DispatchQueue(label: "MultiHandle.isolation", target: workQueue)
             setupCallbacks()
@@ -103,33 +99,25 @@
         // through libdispatch (DispatchSource) and store the source(s) inside
         // a `SocketSources` which we in turn store inside libcurl's multi handle
         // by means of curl_multi_assign() -- we retain the object fist.
-        commandQueue.async {
-            self.cancelInProgress?.wait()
-            self.cancelInProgress = nil
-
-            let action = _SocketRegisterAction(rawValue: CFURLSessionPoll(value: what))
-            var socketSources = _SocketSources.from(socketSourcePtr: socketSourcePtr)
-            if socketSources == nil && action.needsSource {
-                let s = _SocketSources()
-                let p = Unmanaged.passRetained(s).toOpaque()
-                CFURLSessionMultiHandleAssign(self.rawHandle, socket, UnsafeMutableRawPointer(p))
-                socketSources = s
-            } else if socketSources != nil && action == .unregister {
-                //the beginning of an unregister operation
-                self.cancelInProgress = DispatchSemaphore(value: 0)
-                // We need to release the stored pointer:
-                if let opaque = socketSourcePtr {
-                    Unmanaged<_SocketSources>.fromOpaque(opaque).release()
-                }
-                socketSources?.tearDown(self.cancelInProgress)
-                socketSources = nil
+        let action = _SocketRegisterAction(rawValue: CFURLSessionPoll(value: what))
+        var socketSources = _SocketSources.from(socketSourcePtr: socketSourcePtr)
+        if socketSources == nil && action.needsSource {
+            let s = _SocketSources()
+            let p = Unmanaged.passRetained(s).toOpaque()
+            CFURLSessionMultiHandleAssign(rawHandle, socket, UnsafeMutableRawPointer(p))
+            socketSources = s
+        } else if socketSources != nil && action == .unregister {
+            // We need to release the stored pointer:
+            if let opaque = socketSourcePtr {
+                Unmanaged<_SocketSources>.fromOpaque(opaque).release()
             }
-            if let ss = socketSources {
-                let handler = DispatchWorkItem { [weak self] in
-                    self?.performAction(for: socket)
-                }
-                ss.createSources(with: action, fileDescriptor: Int(socket), queue: self.queue, handler: handler)
+            socketSources = nil
+        }
+        if let ss = socketSources {
+            let handler = DispatchWorkItem { [weak self] in
+                self?.performAction(for: socket)
             }
+            ss.createSources(with: action, fileDescriptor: Int(socket), queue: queue, handler: handler)
         }
         return 0
     }
@@ -411,18 +399,12 @@
         s.resume()
     }
 
-    func tearDown(_ cancelInProgress: DispatchSemaphore?) {
-        let cancelHandler = DispatchWorkItem {
-            //the real end of an unregister operation!
-            cancelInProgress?.signal()
-        }
+    func tearDown() {
         if let s = readSource {
-            s.setCancelHandler(handler: cancelHandler)
             s.cancel()
         }
         readSource = nil
         if let s = writeSource {
-            s.setCancelHandler(handler: cancelHandler)
             s.cancel()
         }
         writeSource = nil
diff --git a/TestFoundation/HTTPServer.swift b/TestFoundation/HTTPServer.swift
index b129f74..6f6f035 100644
--- a/TestFoundation/HTTPServer.swift
+++ b/TestFoundation/HTTPServer.swift
@@ -26,6 +26,8 @@
 #endif
 
 public let globalDispatchQueue = DispatchQueue.global()
+public let dispatchQueueMake: (String) -> DispatchQueue = { DispatchQueue.init(label: $0) }
+public let dispatchGroupMake: () -> DispatchGroup = DispatchGroup.init
 
 struct _HTTPUtils {
     static let CRLF = "\r\n"
diff --git a/TestFoundation/TestURLSession.swift b/TestFoundation/TestURLSession.swift
index cf44ab6..68339fc 100644
--- a/TestFoundation/TestURLSession.swift
+++ b/TestFoundation/TestURLSession.swift
@@ -43,6 +43,7 @@
             ("test_illegalHTTPServerResponses", test_illegalHTTPServerResponses),
             ("test_dataTaskWithSharedDelegate", test_dataTaskWithSharedDelegate),
             ("test_simpleUploadWithDelegate", test_simpleUploadWithDelegate),
+            ("test_concurrentRequests", test_concurrentRequests),
         ]
     }
     
@@ -459,6 +460,34 @@
         task.resume()
         waitForExpectations(timeout: 20)
     }
+
+    func test_concurrentRequests() {
+        let syncQ = dispatchQueueMake("test_dataTaskWithURL.syncQ")
+        var dataTasks: [DataTask] = []
+        let g = dispatchGroupMake()
+        for f in 0..<640 {
+            g.enter()
+            let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/Nepal"
+            let expectation = self.expectation(description: "GET \(urlString) [\(f)]: with a delegate")
+            globalDispatchQueue.async {
+                let url = URL(string: urlString)!
+                let d = DataTask(with: expectation)
+                d.run(with: url)
+                syncQ.async {
+                    dataTasks.append(d)
+                    g.leave()
+                }
+            }
+        }
+        waitForExpectations(timeout: 12)
+        g.wait()
+        for d in syncQ.sync(execute: {dataTasks}) {
+            if !d.error {
+                XCTAssertEqual(d.capital, "Kathmandu", "test_dataTaskWithURLRequest returned an unexpected result")
+            }
+        }
+    }
+
 }
 
 class SharedDelegate: NSObject {
@@ -488,19 +517,73 @@
 }
 
 class DataTask : NSObject {
+    let syncQ = dispatchQueueMake("org.swift.TestFoundation.TestURLSession.DataTask.syncQ")
     let dataTaskExpectation: XCTestExpectation!
-    var capital = "unknown"
-    var session: URLSession! = nil
-    var task: URLSessionDataTask! = nil
-    var cancelExpectation: XCTestExpectation?
-    var responseReceivedExpectation: XCTestExpectation?
-    var protocols: [AnyClass]?
+    let protocols: [AnyClass]?
+
+    /* all the following var _XYZ need to be synchronized on syncQ.
+       We can't just assert that we're on main thread here as we're modified in the URLSessionDataDelegate extension
+       for DataTask
+     */
+    var _capital = "unknown"
+    var capital: String {
+        get {
+            return self.syncQ.sync { self._capital }
+        }
+        set {
+            self.syncQ.sync { self._capital = newValue }
+        }
+    }
+    var _session: URLSession! = nil
+    var session: URLSession! {
+        get {
+            return self.syncQ.sync { self._session }
+        }
+        set {
+            self.syncQ.sync { self._session = newValue }
+        }
+    }
+    var _task: URLSessionDataTask! = nil
+    var task: URLSessionDataTask! {
+        get {
+            return self.syncQ.sync { self._task }
+        }
+        set {
+            self.syncQ.sync { self._task = newValue }
+        }
+    }
+    var _cancelExpectation: XCTestExpectation?
+    var cancelExpectation: XCTestExpectation? {
+        get {
+            return self.syncQ.sync { self._cancelExpectation }
+        }
+        set {
+            self.syncQ.sync { self._cancelExpectation = newValue }
+        }
+    }
+    var _responseReceivedExpectation: XCTestExpectation?
+    var responseReceivedExpectation: XCTestExpectation? {
+        get {
+            return self.syncQ.sync { self._responseReceivedExpectation }
+        }
+        set {
+            self.syncQ.sync { self._responseReceivedExpectation = newValue }
+        }
+    }
     
-    public var error = false
+    private var _error = false
+    public var error: Bool {
+        get {
+            return self.syncQ.sync { self._error }
+        }
+        set {
+            self.syncQ.sync { self._error = newValue }
+        }
+    }
     
     init(with expectation: XCTestExpectation, protocolClasses: [AnyClass]? = nil) {
         dataTaskExpectation = expectation
-	protocols = protocolClasses
+        protocols = protocolClasses
     }
     
     func run(with request: URLRequest) {