blob: 03ebd4c9c730545ae3baf556c643f4015a43edd8 [file] [log] [blame]
// Foundation/NSURLSession/HTTPBodySource.swift - NSURLSession & libcurl
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2016 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See http://swift.org/LICENSE.txt for license information
// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
// -----------------------------------------------------------------------------
///
/// These are libcurl helpers for the URLSession API code.
/// - SeeAlso: https://curl.haxx.se/libcurl/c/
/// - SeeAlso: NSURLSession.swift
///
// -----------------------------------------------------------------------------
import CoreFoundation
import Dispatch
/// Turn `NSData` into `dispatch_data_t`
internal func createDispatchData(_ data: Data) -> DispatchData {
//TODO: Avoid copying data
let count = data.count
return data.withUnsafeBytes { (ptr: UnsafePointer<UInt8>) -> DispatchData in
return DispatchData(bytes: UnsafeBufferPointer<UInt8>(start: ptr, count: count))
}
}
/// Copy data from `dispatch_data_t` into memory pointed to by an `UnsafeMutableBufferPointer`.
internal func copyDispatchData<T>(_ data: DispatchData, infoBuffer buffer: UnsafeMutableBufferPointer<T>) {
precondition(data.count <= (buffer.count * MemoryLayout<T>.size))
_ = data.copyBytes(to: buffer)
}
/// Split `dispatch_data_t` into `(head, tail)` pair.
internal func splitData(dispatchData data: DispatchData, atPosition position: Int) -> (DispatchData,DispatchData) {
/*let length = dispatch_data_get_size(data)
let head = dispatch_data_create_subrange(data, 0, position)
let tail = dispatch_data_create_subrange(data, position, length - position)
return (head, tail)*/
return (data.subdata(in: 0..<position), data.subdata(in: position..<data.count))
}
/// A (non-blocking) source for HTTP body data.
internal protocol _HTTPBodySource: class {
/// Get the next chunck of data.
///
/// - Returns: `.data` until the source is exhausted, at which point it will
/// return `.done`. Since this is non-blocking, it will return `.retryLater`
/// if no data is available at this point, but will be available later.
func getNextChunk(withLength length: Int) -> _HTTPBodySourceDataChunk
}
internal enum _HTTPBodySourceDataChunk {
case data(DispatchData)
/// The source is depleted.
case done
/// Retry later to get more data.
case retryLater
case error
}
/// A HTTP body data source backed by `dispatch_data_t`.
internal final class _HTTPBodyDataSource {
var data: DispatchData!
init(data: DispatchData) {
self.data = data
}
}
extension _HTTPBodyDataSource : _HTTPBodySource {
enum _Error : Error {
case unableToRewindData
}
func getNextChunk(withLength length: Int) -> _HTTPBodySourceDataChunk {
let remaining = data.count
if remaining == 0 {
return .done
} else if remaining <= length {
let r: DispatchData! = data
data = nil
return .data(r)
} else {
let (chunk, remainder) = splitData(dispatchData: data, atPosition: length)
data = remainder
return .data(chunk)
}
}
}
/// A HTTP body data source backed by a file.
///
/// This allows non-blocking streaming of file data to the remote server.
///
/// The source reads data using a `dispatch_io_t` channel, and hence reading
/// file data is non-blocking. It has a local buffer that it fills as calls
/// to `getNextChunk(withLength:)` drain it.
///
/// - Note: Calls to `getNextChunk(withLength:)` and callbacks from libdispatch
/// should all happen on the same (serial) queue, and hence this code doesn't
/// have to be thread safe.
internal final class _HTTPBodyFileSource {
fileprivate let fileURL: URL
fileprivate let channel: DispatchIO
fileprivate let workQueue: DispatchQueue
fileprivate let dataAvailableHandler: () -> Void
fileprivate var hasActiveReadHandler = false
fileprivate var availableChunk: _Chunk = .empty
/// Create a new data source backed by a file.
///
/// - Parameter fileURL: the file to read from
/// - Parameter workQueue: the queue that it's safe to call
/// `getNextChunk(withLength:)` on, and that the `dataAvailableHandler`
/// will be called on.
/// - Parameter dataAvailableHandler: Will be called when data becomes
/// available. Reading data is done in a non-blocking way, such that
/// no data may be available even if there's more data in the file.
/// if `getNextChunk(withLength:)` returns `.retryLater`, this handler
/// will be called once data becomes available.
init(fileURL: URL, workQueue: DispatchQueue, dataAvailableHandler: @escaping () -> Void) {
guard fileURL.isFileURL else { fatalError("The body data URL must be a file URL.") }
self.fileURL = fileURL
self.workQueue = workQueue
self.dataAvailableHandler = dataAvailableHandler
var fileSystemRepresentation: UnsafePointer<Int8>! = nil
fileURL.withUnsafeFileSystemRepresentation {
fileSystemRepresentation = $0
}
self.channel = DispatchIO(type: .stream, path: fileSystemRepresentation, oflag: O_RDONLY, mode: 0, queue: workQueue, cleanupHandler: {_ in })
self.channel.setLimit(highWater: CFURLSessionMaxWriteSize)
}
fileprivate enum _Chunk {
/// Nothing has been read, yet
case empty
/// An error has occured while reading
case errorDetected(Int)
/// Data has been read
case data(DispatchData)
/// All data has been read from the file (EOF).
case done(DispatchData?)
}
}
fileprivate extension _HTTPBodyFileSource {
fileprivate var desiredBufferLength: Int { return 3 * CFURLSessionMaxWriteSize }
/// Enqueue a dispatch I/O read to fill the buffer.
///
/// - Note: This is a no-op if the buffer is full, or if a read operation
/// is already enqueued.
fileprivate func readNextChunk() {
// libcurl likes to use a buffer of size CFURLSessionMaxWriteSize, we'll
// try to keep 3 x of that around in the `chunk` buffer.
guard availableByteCount < desiredBufferLength else { return }
guard !hasActiveReadHandler else { return } // We're already reading
hasActiveReadHandler = true
let lengthToRead = desiredBufferLength - availableByteCount
channel.read(offset: 0, length: lengthToRead, queue: workQueue) { (done: Bool, data: DispatchData?, errno: Int32) in
let wasEmpty = self.availableByteCount == 0
self.hasActiveReadHandler = !done
switch (done, data, errno) {
case (true, _, errno) where errno != 0:
self.availableChunk = .errorDetected(Int(errno))
case (true, .some(let d), 0) where d.count == 0:
self.append(data: d, endOfFile: true)
case (true, .some(let d), 0):
self.append(data: d, endOfFile: false)
case (false, .some(let d), 0):
self.append(data: d, endOfFile: false)
default:
fatalError("Invalid arguments to read(3) callback.")
}
if wasEmpty && (0 < self.availableByteCount) {
self.dataAvailableHandler()
}
}
}
fileprivate func append(data: DispatchData, endOfFile: Bool) {
switch availableChunk {
case .empty:
availableChunk = endOfFile ? .done(data) : .data(data)
case .errorDetected:
break
case .data(var oldData):
oldData.append(data)
availableChunk = endOfFile ? .done(oldData) : .data(oldData)
case .done:
fatalError("Trying to append data, but end-of-file was already detected.")
}
}
fileprivate var availableByteCount: Int {
switch availableChunk {
case .empty: return 0
case .errorDetected: return 0
case .data(let d): return d.count
case .done(.some(let d)): return d.count
case .done(.none): return 0
}
}
}
extension _HTTPBodyFileSource : _HTTPBodySource {
func getNextChunk(withLength length: Int) -> _HTTPBodySourceDataChunk {
switch availableChunk {
case .empty:
readNextChunk()
return .retryLater
case .errorDetected:
return .error
case .data(let data):
let l = min(length, data.count)
let (head, tail) = splitData(dispatchData: data, atPosition: l)
availableChunk = (tail.count == 0) ? .empty : .data(tail)
readNextChunk()
if head.count == 0 {
return .retryLater
} else {
return .data(head)
}
case .done(.some(let data)):
let l = min(length, data.count)
let (head, tail) = splitData(dispatchData: data, atPosition: l)
availableChunk = (tail.count == 0) ? .done(nil) : .done(tail)
if (head.count == 0) {
return .done
} else {
return .data(head)
}
case .done(.none):
return .done
}
}
}