blob: d6bbb24b3eac1b51a94f01f5bbba4c2eb168670e [file] [log] [blame]
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2016 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See http://swift.org/LICENSE.txt for license information
// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
// -----------------------------------------------------------------------------
///
/// libcurl *multi handle* wrapper.
/// These are libcurl helpers for the URLSession API code.
/// - SeeAlso: https://curl.haxx.se/libcurl/c/
/// - SeeAlso: NSURLSession.swift
///
// -----------------------------------------------------------------------------
import CoreFoundation
import Dispatch
extension URLSession {
/// Minimal wrapper around [curl multi interface](https://curl.haxx.se/libcurl/c/libcurl-multi.html).
///
/// The the *multi handle* manages the sockets for easy handles
/// (`_EasyHandle`), and this implementation uses
/// libdispatch to listen for sockets being read / write ready.
///
/// Using `DispatchSource` allows this implementation to be
/// non-blocking and all code to run on the same thread /
/// `DispatchQueue` -- thus keeping is simple.
///
/// - SeeAlso: _EasyHandle
internal final class _MultiHandle {
let rawHandle = CFURLSessionMultiHandleInit()
let queue: DispatchQueue
//let queue = DispatchQueue(label: "MultiHandle.isolation", attributes: .serial)
let group = DispatchGroup()
fileprivate var easyHandles: [_EasyHandle] = []
fileprivate var timeoutSource: _TimeoutSource? = nil
init(configuration: URLSession._Configuration, workQueue: DispatchQueue) {
//queue.setTarget(queue: workQueue)
queue = DispatchQueue(label: "MultiHandle.isolation", target: workQueue)
setupCallbacks()
configure(with: configuration)
}
deinit {
// C.f.: <https://curl.haxx.se/libcurl/c/curl_multi_cleanup.html>
easyHandles.forEach {
try! CFURLSessionMultiHandleRemoveHandle(rawHandle, $0.rawHandle).asError()
}
try! CFURLSessionMultiHandleDeinit(rawHandle).asError()
}
}
}
extension URLSession._MultiHandle {
func configure(with configuration: URLSession._Configuration) {
try! CFURLSession_multi_setopt_l(rawHandle, CFURLSessionMultiOptionMAX_HOST_CONNECTIONS, configuration.httpMaximumConnectionsPerHost).asError()
try! CFURLSession_multi_setopt_l(rawHandle, CFURLSessionMultiOptionPIPELINING, configuration.httpShouldUsePipelining ? 3 : 2).asError()
//TODO: We may want to set
// CFURLSessionMultiOptionMAXCONNECTS
// CFURLSessionMultiOptionMAX_TOTAL_CONNECTIONS
}
}
fileprivate extension URLSession._MultiHandle {
static func from(callbackUserData userdata: UnsafeMutableRawPointer?) -> URLSession._MultiHandle? {
guard let userdata = userdata else { return nil }
return Unmanaged<URLSession._MultiHandle>.fromOpaque(userdata).takeUnretainedValue()
}
}
fileprivate extension URLSession._MultiHandle {
/// Forward the libcurl callbacks into Swift methods
func setupCallbacks() {
// Socket
try! CFURLSession_multi_setopt_ptr(rawHandle, CFURLSessionMultiOptionSOCKETDATA, UnsafeMutableRawPointer(Unmanaged.passUnretained(self).toOpaque())).asError()
try! CFURLSession_multi_setopt_sf(rawHandle, CFURLSessionMultiOptionSOCKETFUNCTION) { (easyHandle: CFURLSessionEasyHandle, socket: CFURLSession_socket_t, what: Int32, userdata: UnsafeMutableRawPointer?, socketptr: UnsafeMutableRawPointer?) -> Int32 in
guard let handle = URLSession._MultiHandle.from(callbackUserData: userdata) else { fatalError() }
return handle.register(socket: socket, for: easyHandle, what: what, socketSourcePtr: socketptr)
}.asError()
// Timeout:
try! CFURLSession_multi_setopt_ptr(rawHandle, CFURLSessionMultiOptionTIMERDATA, UnsafeMutableRawPointer(Unmanaged.passUnretained(self).toOpaque())).asError()
try! CFURLSession_multi_setopt_tf(rawHandle, CFURLSessionMultiOptionTIMERFUNCTION) { (_, timeout: Int, userdata: UnsafeMutableRawPointer?) -> Int32 in
guard let handle = URLSession._MultiHandle.from(callbackUserData: userdata) else { fatalError() }
handle.updateTimeoutTimer(to: timeout)
return 0
}.asError()
}
/// <https://curl.haxx.se/libcurl/c/CURLMOPT_SOCKETFUNCTION.html> and
/// <https://curl.haxx.se/libcurl/c/curl_multi_socket_action.html>
func register(socket: CFURLSession_socket_t, for easyHandle: CFURLSessionEasyHandle, what: Int32, socketSourcePtr: UnsafeMutableRawPointer?) -> Int32 {
// We get this callback whenever we need to register or unregister a
// given socket with libdispatch.
// The `action` / `what` defines if we should register or unregister
// that we're interested in read and/or write readiness. We will do so
// through libdispatch (DispatchSource) and store the source(s) inside
// a `SocketSources` which we in turn store inside libcurl's multi handle
// by means of curl_multi_assign() -- we retain the object fist.
let action = _SocketRegisterAction(rawValue: CFURLSessionPoll(value: what))
var socketSources = _SocketSources.from(socketSourcePtr: socketSourcePtr)
if socketSources == nil && action.needsSource {
let s = _SocketSources()
let p = Unmanaged.passRetained(s).toOpaque()
CFURLSessionMultiHandleAssign(rawHandle, socket, UnsafeMutableRawPointer(p))
socketSources = s
} else if socketSources != nil && action == .unregister {
// We need to release the stored pointer:
if let opaque = socketSourcePtr {
Unmanaged<_SocketSources>.fromOpaque(opaque).release()
}
socketSources = nil
}
if let ss = socketSources {
let handler = DispatchWorkItem { [weak self] in
self?.performAction(for: socket)
}
ss.createSources(with: action, fileDescriptor: Int(socket), queue: queue, handler: handler)
}
return 0
}
/// What read / write ready event to register / unregister.
///
/// This re-maps `CFURLSessionPoll` / `CURL_POLL`.
enum _SocketRegisterAction {
case none
case registerRead
case registerWrite
case registerReadAndWrite
case unregister
}
}
internal extension URLSession._MultiHandle {
/// Add an easy handle -- start its transfer.
func add(_ handle: _EasyHandle) {
// If this is the first handle being added, we need to `kick` the
// underlying multi handle by calling `timeoutTimerFired` as
// described in
// <https://curl.haxx.se/libcurl/c/curl_multi_socket_action.html>.
// That will initiate the registration for timeout timer and socket
// readiness.
let needsTimeout = self.easyHandles.isEmpty
self.easyHandles.append(handle)
try! CFURLSessionMultiHandleAddHandle(self.rawHandle, handle.rawHandle).asError()
if needsTimeout {
self.timeoutTimerFired()
}
}
/// Remove an easy handle -- stop its transfer.
func remove(_ handle: _EasyHandle) {
guard let idx = self.easyHandles.index(of: handle) else {
fatalError("Handle not in list.")
}
self.easyHandles.remove(at: idx)
try! CFURLSessionMultiHandleRemoveHandle(self.rawHandle, handle.rawHandle).asError()
}
}
fileprivate extension URLSession._MultiHandle {
/// This gets called when we should ask curl to perform action on a socket.
func performAction(for socket: CFURLSession_socket_t) {
try! readAndWriteAvailableData(on: socket)
}
/// This gets called when our timeout timer fires.
///
/// libcurl relies on us calling curl_multi_socket_action() every now and then.
func timeoutTimerFired() {
try! readAndWriteAvailableData(on: CFURLSessionSocketTimeout)
}
/// reads/writes available data given an action
func readAndWriteAvailableData(on socket: CFURLSession_socket_t) throws {
var runningHandlesCount = Int32(0)
try CFURLSessionMultiHandleAction(rawHandle, socket, 0, &runningHandlesCount).asError()
//TODO: Do we remove the timeout timer here if / when runningHandles == 0 ?
readMessages()
}
/// Check the status of all individual transfers.
///
/// libcurl refers to this as “read multi stack informationals”.
/// Check for transfers that completed.
func readMessages() {
// We pop the messages one by one in a loop:
repeat {
// count will contain the messages left in the queue
var count = Int32(0)
let info = CFURLSessionMultiHandleInfoRead(rawHandle, &count)
guard let handle = info.easyHandle else { break }
let code = info.resultCode
completedTransfer(forEasyHandle: handle, easyCode: code)
} while true
}
/// Transfer completed.
func completedTransfer(forEasyHandle handle: CFURLSessionEasyHandle, easyCode: CFURLSessionEasyCode) {
// Look up the matching wrapper:
guard let idx = easyHandles.index(where: { $0.rawHandle == handle }) else {
fatalError("Tansfer completed for easy handle, but it is not in the list of added handles.")
}
let easyHandle = easyHandles[idx]
// Find the NSURLError code
let errorCode = easyHandle.urlErrorCode(for: easyCode)
completedTransfer(forEasyHandle: easyHandle, errorCode: errorCode)
}
/// Transfer completed.
func completedTransfer(forEasyHandle handle: _EasyHandle, errorCode: Int?) {
handle.completedTransfer(withErrorCode: errorCode)
}
}
fileprivate extension _EasyHandle {
/// An error code within the `NSURLErrorDomain` based on the error of the
/// easy handle.
/// - Note: The error value is set only on failure. You can't use it to
/// determine *if* something failed or not, only *why* it failed.
func urlErrorCode(for easyCode: CFURLSessionEasyCode) -> Int? {
switch (easyCode, CInt(connectFailureErrno)) {
case (CFURLSessionEasyCodeOK, _):
return nil
case (_, ECONNREFUSED):
return NSURLErrorCannotConnectToHost
case (CFURLSessionEasyCodeUNSUPPORTED_PROTOCOL, _):
return NSURLErrorUnsupportedURL
case (CFURLSessionEasyCodeURL_MALFORMAT, _):
return NSURLErrorBadURL
case (CFURLSessionEasyCodeCOULDNT_RESOLVE_HOST, _):
// Oddly, this appears to happen for malformed URLs, too.
return NSURLErrorCannotFindHost
case (CFURLSessionEasyCodeRECV_ERROR, ECONNRESET):
return NSURLErrorNetworkConnectionLost
case (CFURLSessionEasyCodeSEND_ERROR, ECONNRESET):
return NSURLErrorNetworkConnectionLost
case (CFURLSessionEasyCodeGOT_NOTHING, _):
return NSURLErrorBadServerResponse
case (CFURLSessionEasyCodeABORTED_BY_CALLBACK, _):
return NSURLErrorUnknown // Or NSURLErrorCancelled if we're in such a state
case (CFURLSessionEasyCodeCOULDNT_CONNECT, ETIMEDOUT):
return NSURLErrorTimedOut
case (CFURLSessionEasyCodeOPERATION_TIMEDOUT, _):
return NSURLErrorTimedOut
default:
//TODO: Need to map to one of the NSURLError... constants
return NSURLErrorUnknown
}
}
}
fileprivate extension URLSession._MultiHandle._SocketRegisterAction {
init(rawValue: CFURLSessionPoll) {
switch rawValue {
case CFURLSessionPollNone:
self = .none
case CFURLSessionPollIn:
self = .registerRead
case CFURLSessionPollOut:
self = .registerWrite
case CFURLSessionPollInOut:
self = .registerReadAndWrite
case CFURLSessionPollRemove:
self = .unregister
default:
fatalError("Invalid CFURLSessionPoll value.")
}
}
}
extension CFURLSessionPoll : Equatable {}
public func ==(lhs: CFURLSessionPoll, rhs: CFURLSessionPoll) -> Bool {
return lhs.value == rhs.value
}
fileprivate extension URLSession._MultiHandle._SocketRegisterAction {
/// Should a libdispatch source be registered for **read** readiness?
var needsReadSource: Bool {
switch self {
case .none: return false
case .registerRead: return true
case .registerWrite: return false
case .registerReadAndWrite: return true
case .unregister: return false
}
}
/// Should a libdispatch source be registered for **write** readiness?
var needsWriteSource: Bool {
switch self {
case .none: return false
case .registerRead: return false
case .registerWrite: return true
case .registerReadAndWrite: return true
case .unregister: return false
}
}
/// Should either a **read** or a **write** readiness libdispatch source be
/// registered?
var needsSource: Bool {
return needsReadSource || needsWriteSource
}
}
/// A helper class that wraps a libdispatch timer.
///
/// Used to implement the timeout of `URLSession.MultiHandle`.
fileprivate class _TimeoutSource {
let rawSource: DispatchSource
let milliseconds: Int
init(queue: DispatchQueue, milliseconds: Int, handler: DispatchWorkItem) {
self.milliseconds = milliseconds
self.rawSource = DispatchSource.makeTimerSource(queue: queue) as! DispatchSource
let delay = UInt64(max(1, milliseconds - 1))
//let leeway: UInt64 = (milliseconds == 1) ? NSEC_PER_USEC : NSEC_PER_MSEC
let start = DispatchTime.now() + DispatchTimeInterval.milliseconds(Int(delay))
rawSource.scheduleRepeating(deadline: start, interval: .milliseconds(Int(delay)), leeway: (milliseconds == 1) ? .microseconds(Int(1)) : .milliseconds(Int(1)))
rawSource.setEventHandler(handler: handler)
rawSource.resume()
}
deinit {
rawSource.cancel()
}
}
fileprivate extension URLSession._MultiHandle {
/// <https://curl.haxx.se/libcurl/c/CURLMOPT_TIMERFUNCTION.html>
func updateTimeoutTimer(to value: Int) {
updateTimeoutTimer(to: _Timeout(timeout: value))
}
func updateTimeoutTimer(to timeout: _Timeout) {
// Set up a timeout timer based on the given value:
switch timeout {
case .none:
timeoutSource = nil
case .immediate:
timeoutSource = nil
timeoutTimerFired()
case .milliseconds(let milliseconds):
if (timeoutSource == nil) || timeoutSource!.milliseconds != milliseconds {
//TODO: Could simply change the existing timer by calling
// dispatch_source_set_timer() again.
let block = DispatchWorkItem { [weak self] in
self?.timeoutTimerFired()
}
timeoutSource = _TimeoutSource(queue: queue, milliseconds: milliseconds, handler: block)
}
}
}
enum _Timeout {
case milliseconds(Int)
case none
case immediate
}
}
fileprivate extension URLSession._MultiHandle._Timeout {
init(timeout: Int) {
switch timeout {
case -1:
self = .none
case 0:
self = .immediate
default:
self = .milliseconds(timeout)
}
}
}
/// Read and write libdispatch sources for a specific socket.
///
/// A simple helper that combines two sources -- both being optional.
///
/// This info is stored into the socket using `curl_multi_assign()`.
///
/// - SeeAlso: URLSession.MultiHandle.SocketRegisterAction
fileprivate class _SocketSources {
var readSource: DispatchSource?
var writeSource: DispatchSource?
func createReadSource(fileDescriptor fd: Int, queue: DispatchQueue, handler: DispatchWorkItem) {
guard readSource == nil else { return }
let s = DispatchSource.makeReadSource(fileDescriptor: Int32(fd), queue: queue)
s.setEventHandler(handler: handler)
readSource = s as? DispatchSource
s.resume()
}
func createWriteSource(fileDescriptor fd: Int, queue: DispatchQueue, handler: DispatchWorkItem) {
guard writeSource == nil else { return }
let s = DispatchSource.makeWriteSource(fileDescriptor: Int32(fd), queue: queue)
s.setEventHandler(handler: handler)
writeSource = s as? DispatchSource
s.resume()
}
func tearDown() {
if let s = readSource {
s.cancel()
}
readSource = nil
if let s = writeSource {
s.cancel()
}
writeSource = nil
}
}
extension _SocketSources {
/// Create a read and/or write source as specified by the action.
func createSources(with action: URLSession._MultiHandle._SocketRegisterAction, fileDescriptor fd: Int, queue: DispatchQueue, handler: DispatchWorkItem) {
if action.needsReadSource {
createReadSource(fileDescriptor: fd, queue: queue, handler: handler)
}
if action.needsWriteSource {
createWriteSource(fileDescriptor: fd, queue: queue, handler: handler)
}
}
}
extension _SocketSources {
/// Unwraps the `SocketSources`
///
/// A `SocketSources` is stored into the multi handle's socket using
/// `curl_multi_assign()`. This helper unwraps it from the returned
/// `UnsafeMutablePointer<Void>`.
static func from(socketSourcePtr ptr: UnsafeMutableRawPointer?) -> _SocketSources? {
guard let ptr = ptr else { return nil }
return Unmanaged<_SocketSources>.fromOpaque(ptr).takeUnretainedValue()
}
}
extension CFURLSessionMultiCode : Equatable {}
public func ==(lhs: CFURLSessionMultiCode, rhs: CFURLSessionMultiCode) -> Bool {
return lhs.value == rhs.value
}
extension CFURLSessionMultiCode : Error {
public var _domain: String { return "libcurl.Multi" }
public var _code: Int { return Int(self.value) }
}
internal extension CFURLSessionMultiCode {
func asError() throws {
if self == CFURLSessionMultiCodeOK { return }
throw self
}
}