Merge pull request #1283 from phausler/swift-4.0-branch-data_sync
diff --git a/Foundation/URLSession/URLSession.swift b/Foundation/URLSession/URLSession.swift
index 9dbb84c..bcf8589 100644
--- a/Foundation/URLSession/URLSession.swift
+++ b/Foundation/URLSession/URLSession.swift
@@ -172,11 +172,13 @@
import Dispatch
+fileprivate let globalVarSyncQ = DispatchQueue(label: "org.swift.Foundation.URLSession.GlobalVarSyncQ")
fileprivate var sessionCounter = Int32(0)
fileprivate func nextSessionIdentifier() -> Int32 {
- //TODO: find an alternative for OSAtomicIncrement32Barrier() on Linux
- sessionCounter += 1
- return sessionCounter
+ return globalVarSyncQ.sync {
+ sessionCounter += 1
+ return sessionCounter
+ }
}
public let NSURLSessionTransferSizeUnknown: Int64 = -1
@@ -397,9 +399,11 @@
fileprivate extension URLSession {
func createNextTaskIdentifier() -> Int {
- let i = nextTaskIdentifier
- nextTaskIdentifier += 1
- return i
+ return workQueue.sync {
+ let i = nextTaskIdentifier
+ nextTaskIdentifier += 1
+ return i
+ }
}
}
diff --git a/Foundation/URLSession/URLSessionTask.swift b/Foundation/URLSession/URLSessionTask.swift
index 7dcc66e..92c9cee 100644
--- a/Foundation/URLSession/URLSessionTask.swift
+++ b/Foundation/URLSession/URLSessionTask.swift
@@ -30,13 +30,10 @@
internal var session: URLSessionProtocol! //change to nil when task completes
internal let body: _Body
fileprivate var _protocol: URLProtocol? = nil
-
+ private let syncQ = DispatchQueue(label: "org.swift.URLSessionTask.SyncQ")
+
/// All operations must run on this queue.
internal let workQueue: DispatchQueue
- /// Using dispatch semaphore to make public attributes thread safe.
- /// A semaphore is a simpler option against the usage of concurrent queue
- /// as the critical sections are very short.
- fileprivate let semaphore = DispatchSemaphore(value: 1)
public override init() {
// Darwin Foundation oddly allows calling this initializer, even though
@@ -62,7 +59,8 @@
}
internal init(session: URLSession, request: URLRequest, taskIdentifier: Int, body: _Body) {
self.session = session
- self.workQueue = session.workQueue
+ /* make sure we're actually having a serial queue as it's used for synchronization */
+ self.workQueue = DispatchQueue.init(label: "org.swift.URLSessionTask.WorkQueue", target: session.workQueue)
self.taskIdentifier = taskIdentifier
self.originalRequest = request
self.body = body
@@ -108,31 +106,19 @@
/// May differ from originalRequest due to http server redirection
/*@NSCopying*/ open internal(set) var currentRequest: URLRequest? {
get {
- semaphore.wait()
- defer {
- semaphore.signal()
- }
- return self._currentRequest
+ return self.syncQ.sync { return self._currentRequest }
}
set {
- semaphore.wait()
- self._currentRequest = newValue
- semaphore.signal()
+ self.syncQ.sync { self._currentRequest = newValue }
}
}
fileprivate var _currentRequest: URLRequest? = nil
/*@NSCopying*/ open internal(set) var response: URLResponse? {
get {
- semaphore.wait()
- defer {
- semaphore.signal()
- }
- return self._response
+ return self.syncQ.sync { return self._response }
}
set {
- semaphore.wait()
- self._response = newValue
- semaphore.signal()
+ self.syncQ.sync { self._response = newValue }
}
}
fileprivate var _response: URLResponse? = nil
@@ -145,16 +131,10 @@
/// Number of body bytes already received
open internal(set) var countOfBytesReceived: Int64 {
get {
- semaphore.wait()
- defer {
- semaphore.signal()
- }
- return self._countOfBytesReceived
+ return self.syncQ.sync { return self._countOfBytesReceived }
}
set {
- semaphore.wait()
- self._countOfBytesReceived = newValue
- semaphore.signal()
+ self.syncQ.sync { self._countOfBytesReceived = newValue }
}
}
fileprivate var _countOfBytesReceived: Int64 = 0
@@ -162,16 +142,10 @@
/// Number of body bytes already sent */
open internal(set) var countOfBytesSent: Int64 {
get {
- semaphore.wait()
- defer {
- semaphore.signal()
- }
- return self._countOfBytesSent
+ return self.syncQ.sync { return self._countOfBytesSent }
}
set {
- semaphore.wait()
- self._countOfBytesSent = newValue
- semaphore.signal()
+ self.syncQ.sync { self._countOfBytesSent = newValue }
}
}
@@ -211,16 +185,10 @@
*/
open var state: URLSessionTask.State {
get {
- semaphore.wait()
- defer {
- semaphore.signal()
- }
- return self._state
+ return self.syncQ.sync { self._state }
}
set {
- semaphore.wait()
- self._state = newValue
- semaphore.signal()
+ self.syncQ.sync { self._state = newValue }
}
}
fileprivate var _state: URLSessionTask.State = .suspended
@@ -315,16 +283,10 @@
/// URLSessionTask.highPriority, but use is not restricted to these.
open var priority: Float {
get {
- semaphore.wait()
- defer {
- semaphore.signal()
- }
- return self._priority
+ return self.workQueue.sync { return self._priority }
}
set {
- semaphore.wait()
- self._priority = newValue
- semaphore.signal()
+ self.workQueue.sync { self._priority = newValue }
}
}
fileprivate var _priority: Float = URLSessionTask.defaultPriority
@@ -569,7 +531,9 @@
session.delegateQueue.addOperation {
delegate.urlSession(session, task: task, didCompleteWithError: nil)
task.state = .completed
- session.taskRegistry.remove(task)
+ task.workQueue.async {
+ session.taskRegistry.remove(task)
+ }
}
case .noDelegate:
task.state = .completed
@@ -625,7 +589,9 @@
session.delegateQueue.addOperation {
delegate.urlSession(session, task: task, didCompleteWithError: error as Error)
task.state = .completed
- session.taskRegistry.remove(task)
+ task.workQueue.async {
+ session.taskRegistry.remove(task)
+ }
}
case .noDelegate:
task.state = .completed
@@ -634,7 +600,9 @@
session.delegateQueue.addOperation {
completion(nil, nil, error)
task.state = .completed
- session.taskRegistry.remove(task)
+ task.workQueue.async {
+ session.taskRegistry.remove(task)
+ }
}
case .downloadCompletionHandler(let completion):
session.delegateQueue.addOperation {
diff --git a/Foundation/URLSession/http/MultiHandle.swift b/Foundation/URLSession/http/MultiHandle.swift
index bb2acaf..506c9a9 100644
--- a/Foundation/URLSession/http/MultiHandle.swift
+++ b/Foundation/URLSession/http/MultiHandle.swift
@@ -39,11 +39,7 @@
let group = DispatchGroup()
fileprivate var easyHandles: [_EasyHandle] = []
fileprivate var timeoutSource: _TimeoutSource? = nil
-
- //SR-4567: we need to synchronize the register/unregister commands to the epoll machinery in libdispatch
- fileprivate let commandQueue: DispatchQueue = DispatchQueue(label: "Register-unregister synchronization")
- fileprivate var cancelInProgress: DispatchSemaphore? = nil
-
+
init(configuration: URLSession._Configuration, workQueue: DispatchQueue) {
queue = DispatchQueue(label: "MultiHandle.isolation", target: workQueue)
setupCallbacks()
@@ -103,33 +99,25 @@
// through libdispatch (DispatchSource) and store the source(s) inside
// a `SocketSources` which we in turn store inside libcurl's multi handle
// by means of curl_multi_assign() -- we retain the object fist.
- commandQueue.async {
- self.cancelInProgress?.wait()
- self.cancelInProgress = nil
-
- let action = _SocketRegisterAction(rawValue: CFURLSessionPoll(value: what))
- var socketSources = _SocketSources.from(socketSourcePtr: socketSourcePtr)
- if socketSources == nil && action.needsSource {
- let s = _SocketSources()
- let p = Unmanaged.passRetained(s).toOpaque()
- CFURLSessionMultiHandleAssign(self.rawHandle, socket, UnsafeMutableRawPointer(p))
- socketSources = s
- } else if socketSources != nil && action == .unregister {
- //the beginning of an unregister operation
- self.cancelInProgress = DispatchSemaphore(value: 0)
- // We need to release the stored pointer:
- if let opaque = socketSourcePtr {
- Unmanaged<_SocketSources>.fromOpaque(opaque).release()
- }
- socketSources?.tearDown(self.cancelInProgress)
- socketSources = nil
+ let action = _SocketRegisterAction(rawValue: CFURLSessionPoll(value: what))
+ var socketSources = _SocketSources.from(socketSourcePtr: socketSourcePtr)
+ if socketSources == nil && action.needsSource {
+ let s = _SocketSources()
+ let p = Unmanaged.passRetained(s).toOpaque()
+ CFURLSessionMultiHandleAssign(rawHandle, socket, UnsafeMutableRawPointer(p))
+ socketSources = s
+ } else if socketSources != nil && action == .unregister {
+ // We need to release the stored pointer:
+ if let opaque = socketSourcePtr {
+ Unmanaged<_SocketSources>.fromOpaque(opaque).release()
}
- if let ss = socketSources {
- let handler = DispatchWorkItem { [weak self] in
- self?.performAction(for: socket)
- }
- ss.createSources(with: action, fileDescriptor: Int(socket), queue: self.queue, handler: handler)
+ socketSources = nil
+ }
+ if let ss = socketSources {
+ let handler = DispatchWorkItem { [weak self] in
+ self?.performAction(for: socket)
}
+ ss.createSources(with: action, fileDescriptor: Int(socket), queue: queue, handler: handler)
}
return 0
}
@@ -411,18 +399,12 @@
s.resume()
}
- func tearDown(_ cancelInProgress: DispatchSemaphore?) {
- let cancelHandler = DispatchWorkItem {
- //the real end of an unregister operation!
- cancelInProgress?.signal()
- }
+ func tearDown() {
if let s = readSource {
- s.setCancelHandler(handler: cancelHandler)
s.cancel()
}
readSource = nil
if let s = writeSource {
- s.setCancelHandler(handler: cancelHandler)
s.cancel()
}
writeSource = nil
diff --git a/TestFoundation/HTTPServer.swift b/TestFoundation/HTTPServer.swift
index b129f74..6f6f035 100644
--- a/TestFoundation/HTTPServer.swift
+++ b/TestFoundation/HTTPServer.swift
@@ -26,6 +26,8 @@
#endif
public let globalDispatchQueue = DispatchQueue.global()
+public let dispatchQueueMake: (String) -> DispatchQueue = { DispatchQueue.init(label: $0) }
+public let dispatchGroupMake: () -> DispatchGroup = DispatchGroup.init
struct _HTTPUtils {
static let CRLF = "\r\n"
diff --git a/TestFoundation/TestURLSession.swift b/TestFoundation/TestURLSession.swift
index cf44ab6..68339fc 100644
--- a/TestFoundation/TestURLSession.swift
+++ b/TestFoundation/TestURLSession.swift
@@ -43,6 +43,7 @@
("test_illegalHTTPServerResponses", test_illegalHTTPServerResponses),
("test_dataTaskWithSharedDelegate", test_dataTaskWithSharedDelegate),
("test_simpleUploadWithDelegate", test_simpleUploadWithDelegate),
+ ("test_concurrentRequests", test_concurrentRequests),
]
}
@@ -459,6 +460,34 @@
task.resume()
waitForExpectations(timeout: 20)
}
+
+ func test_concurrentRequests() {
+ let syncQ = dispatchQueueMake("test_dataTaskWithURL.syncQ")
+ var dataTasks: [DataTask] = []
+ let g = dispatchGroupMake()
+ for f in 0..<640 {
+ g.enter()
+ let urlString = "http://127.0.0.1:\(TestURLSession.serverPort)/Nepal"
+ let expectation = self.expectation(description: "GET \(urlString) [\(f)]: with a delegate")
+ globalDispatchQueue.async {
+ let url = URL(string: urlString)!
+ let d = DataTask(with: expectation)
+ d.run(with: url)
+ syncQ.async {
+ dataTasks.append(d)
+ g.leave()
+ }
+ }
+ }
+ waitForExpectations(timeout: 12)
+ g.wait()
+ for d in syncQ.sync(execute: {dataTasks}) {
+ if !d.error {
+ XCTAssertEqual(d.capital, "Kathmandu", "test_dataTaskWithURLRequest returned an unexpected result")
+ }
+ }
+ }
+
}
class SharedDelegate: NSObject {
@@ -488,19 +517,73 @@
}
class DataTask : NSObject {
+ let syncQ = dispatchQueueMake("org.swift.TestFoundation.TestURLSession.DataTask.syncQ")
let dataTaskExpectation: XCTestExpectation!
- var capital = "unknown"
- var session: URLSession! = nil
- var task: URLSessionDataTask! = nil
- var cancelExpectation: XCTestExpectation?
- var responseReceivedExpectation: XCTestExpectation?
- var protocols: [AnyClass]?
+ let protocols: [AnyClass]?
+
+ /* all the following var _XYZ need to be synchronized on syncQ.
+ We can't just assert that we're on main thread here as we're modified in the URLSessionDataDelegate extension
+ for DataTask
+ */
+ var _capital = "unknown"
+ var capital: String {
+ get {
+ return self.syncQ.sync { self._capital }
+ }
+ set {
+ self.syncQ.sync { self._capital = newValue }
+ }
+ }
+ var _session: URLSession! = nil
+ var session: URLSession! {
+ get {
+ return self.syncQ.sync { self._session }
+ }
+ set {
+ self.syncQ.sync { self._session = newValue }
+ }
+ }
+ var _task: URLSessionDataTask! = nil
+ var task: URLSessionDataTask! {
+ get {
+ return self.syncQ.sync { self._task }
+ }
+ set {
+ self.syncQ.sync { self._task = newValue }
+ }
+ }
+ var _cancelExpectation: XCTestExpectation?
+ var cancelExpectation: XCTestExpectation? {
+ get {
+ return self.syncQ.sync { self._cancelExpectation }
+ }
+ set {
+ self.syncQ.sync { self._cancelExpectation = newValue }
+ }
+ }
+ var _responseReceivedExpectation: XCTestExpectation?
+ var responseReceivedExpectation: XCTestExpectation? {
+ get {
+ return self.syncQ.sync { self._responseReceivedExpectation }
+ }
+ set {
+ self.syncQ.sync { self._responseReceivedExpectation = newValue }
+ }
+ }
- public var error = false
+ private var _error = false
+ public var error: Bool {
+ get {
+ return self.syncQ.sync { self._error }
+ }
+ set {
+ self.syncQ.sync { self._error = newValue }
+ }
+ }
init(with expectation: XCTestExpectation, protocolClasses: [AnyClass]? = nil) {
dataTaskExpectation = expectation
- protocols = protocolClasses
+ protocols = protocolClasses
}
func run(with request: URLRequest) {