blob: ab09c1fd8bfb16f19b7df412b3bd64b03b4591e6 [file] [log] [blame]
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2016 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See http://swift.org/LICENSE.txt for license information
// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
#if DEPLOYMENT_ENABLE_LIBDISPATCH
import Dispatch
#endif
import CoreFoundation
open class Operation : NSObject {
let lock = NSLock()
internal weak var _queue: OperationQueue?
internal var _cancelled = false
internal var _executing = false
internal var _finished = false
internal var _ready = false
internal var _dependencies = Set<Operation>()
#if DEPLOYMENT_ENABLE_LIBDISPATCH
internal var _group = DispatchGroup()
internal var _depGroup = DispatchGroup()
internal var _groups = [DispatchGroup]()
#endif
public override init() {
super.init()
#if DEPLOYMENT_ENABLE_LIBDISPATCH
_group.enter()
#endif
}
internal func _leaveGroups() {
// assumes lock is taken
#if DEPLOYMENT_ENABLE_LIBDISPATCH
_groups.forEach() { $0.leave() }
_groups.removeAll()
_group.leave()
#endif
}
open func start() {
if !isCancelled {
lock.lock()
_executing = true
lock.unlock()
main()
lock.lock()
_executing = false
lock.unlock()
}
finish()
}
internal func finish() {
lock.lock()
_finished = true
_leaveGroups()
lock.unlock()
if let queue = _queue {
queue._operationFinished(self)
}
#if DEPLOYMENT_ENABLE_LIBDISPATCH
// The completion block property is a bit cagey and can not be executed locally on the queue due to thread exhaust potentials.
// This sets up for some strange behavior of finishing operations since the handler will be executed on a different queue
if let completion = completionBlock {
DispatchQueue.global(qos: .background).async { () -> Void in
completion()
}
}
#endif
}
open func main() { }
open var isCancelled: Bool {
return _cancelled
}
open func cancel() {
// Note that calling cancel() is advisory. It is up to the main() function to
// call isCancelled at appropriate points in its execution flow and to do the
// actual canceling work. Eventually main() will invoke finish() and this is
// where we then leave the groups and unblock other operations that might
// depend on us.
lock.lock()
_cancelled = true
lock.unlock()
}
open var isExecuting: Bool {
let wasExecuting: Bool
lock.lock()
wasExecuting = _executing
lock.unlock()
return wasExecuting
}
open var isFinished: Bool {
return _finished
}
// - Note: This property is NEVER used in the objective-c implementation!
open var isAsynchronous: Bool {
return false
}
open var isReady: Bool {
return _ready
}
open func addDependency(_ op: Operation) {
lock.lock()
_dependencies.insert(op)
op.lock.lock()
#if DEPLOYMENT_ENABLE_LIBDISPATCH
_depGroup.enter()
op._groups.append(_depGroup)
#endif
op.lock.unlock()
lock.unlock()
}
open func removeDependency(_ op: Operation) {
lock.lock()
_dependencies.remove(op)
op.lock.lock()
#if DEPLOYMENT_ENABLE_LIBDISPATCH
let groupIndex = op._groups.index(where: { $0 === self._depGroup })
if let idx = groupIndex {
let group = op._groups.remove(at: idx)
group.leave()
}
#endif
op.lock.unlock()
lock.unlock()
}
open var dependencies: [Operation] {
lock.lock()
let ops = _dependencies.map() { $0 }
lock.unlock()
return ops
}
open var queuePriority: QueuePriority = .normal
public var completionBlock: (() -> Void)?
open func waitUntilFinished() {
#if DEPLOYMENT_ENABLE_LIBDISPATCH
_group.wait()
#endif
}
open var threadPriority: Double = 0.5
/// - Note: Quality of service is not directly supported here since there are not qos class promotions available outside of darwin targets.
open var qualityOfService: QualityOfService = .default
open var name: String?
internal func _waitUntilReady() {
#if DEPLOYMENT_ENABLE_LIBDISPATCH
_depGroup.wait()
#endif
_ready = true
}
}
/// The following two methods are added to provide support for Operations which
/// are asynchronous from the execution of the operation queue itself. On Darwin,
/// this is supported via KVO notifications. In the absence of KVO on non-Darwin
/// platforms, these two methods (which are defined in NSObject on Darwin) are
/// temporarily added here. They should be removed once a permanent solution is
/// found.
extension Operation {
public func willChangeValue(forKey key: String) {
// do nothing
}
public func didChangeValue(forKey key: String) {
if key == "isFinished" && isFinished {
finish()
}
}
}
extension Operation {
public enum QueuePriority : Int {
case veryLow
case low
case normal
case high
case veryHigh
}
}
open class BlockOperation: Operation {
typealias ExecutionBlock = () -> Void
internal var _block: () -> Void
internal var _executionBlocks = [ExecutionBlock]()
public init(block: @escaping () -> Void) {
_block = block
}
override open func main() {
lock.lock()
let block = _block
let executionBlocks = _executionBlocks
lock.unlock()
block()
executionBlocks.forEach { $0() }
}
open func addExecutionBlock(_ block: @escaping () -> Void) {
lock.lock()
_executionBlocks.append(block)
lock.unlock()
}
open var executionBlocks: [() -> Void] {
lock.lock()
let blocks = _executionBlocks
lock.unlock()
return blocks
}
}
public extension OperationQueue {
public static let defaultMaxConcurrentOperationCount: Int = Int.max
}
internal struct _OperationList {
var veryLow = [Operation]()
var low = [Operation]()
var normal = [Operation]()
var high = [Operation]()
var veryHigh = [Operation]()
var all = [Operation]()
mutating func insert(_ operation: Operation) {
all.append(operation)
switch operation.queuePriority {
case .veryLow:
veryLow.append(operation)
case .low:
low.append(operation)
case .normal:
normal.append(operation)
case .high:
high.append(operation)
case .veryHigh:
veryHigh.append(operation)
}
}
mutating func remove(_ operation: Operation) {
if let idx = all.index(of: operation) {
all.remove(at: idx)
}
switch operation.queuePriority {
case .veryLow:
if let idx = veryLow.index(of: operation) {
veryLow.remove(at: idx)
}
case .low:
if let idx = low.index(of: operation) {
low.remove(at: idx)
}
case .normal:
if let idx = normal.index(of: operation) {
normal.remove(at: idx)
}
case .high:
if let idx = high.index(of: operation) {
high.remove(at: idx)
}
case .veryHigh:
if let idx = veryHigh.index(of: operation) {
veryHigh.remove(at: idx)
}
}
}
mutating func dequeue() -> Operation? {
if !veryHigh.isEmpty {
return veryHigh.remove(at: 0)
}
if !high.isEmpty {
return high.remove(at: 0)
}
if !normal.isEmpty {
return normal.remove(at: 0)
}
if !low.isEmpty {
return low.remove(at: 0)
}
if !veryLow.isEmpty {
return veryLow.remove(at: 0)
}
return nil
}
var count: Int {
return all.count
}
func map<T>(_ transform: (Operation) throws -> T) rethrows -> [T] {
return try all.map(transform)
}
}
open class OperationQueue: NSObject {
let lock = NSLock()
#if DEPLOYMENT_ENABLE_LIBDISPATCH
var __concurrencyGate: DispatchSemaphore?
var __underlyingQueue: DispatchQueue? {
didSet {
let key = OperationQueue.OperationQueueKey
oldValue?.setSpecific(key: key, value: nil)
__underlyingQueue?.setSpecific(key: key, value: Unmanaged.passUnretained(self))
}
}
let queueGroup = DispatchGroup()
#endif
var _operations = _OperationList()
#if DEPLOYMENT_ENABLE_LIBDISPATCH
internal var _concurrencyGate: DispatchSemaphore? {
get {
lock.lock()
let val = __concurrencyGate
lock.unlock()
return val
}
}
// This is NOT the behavior of the objective-c variant; it will never re-use a queue and instead for every operation it will create a new one.
// However this is considerably faster and probably more effecient.
internal var _underlyingQueue: DispatchQueue {
lock.lock()
if let queue = __underlyingQueue {
lock.unlock()
return queue
} else {
let effectiveName: String
if let requestedName = _name {
effectiveName = requestedName
} else {
effectiveName = "NSOperationQueue::\(Unmanaged.passUnretained(self).toOpaque())"
}
let attr: DispatchQueue.Attributes
if maxConcurrentOperationCount == 1 {
attr = []
__concurrencyGate = DispatchSemaphore(value: 1)
} else {
attr = .concurrent
if maxConcurrentOperationCount != OperationQueue.defaultMaxConcurrentOperationCount {
__concurrencyGate = DispatchSemaphore(value:maxConcurrentOperationCount)
}
}
let queue = DispatchQueue(label: effectiveName, attributes: attr)
if _suspended {
queue.suspend()
}
__underlyingQueue = queue
lock.unlock()
return queue
}
}
#endif
public override init() {
super.init()
}
#if DEPLOYMENT_ENABLE_LIBDISPATCH
internal init(_queue queue: DispatchQueue, maxConcurrentOperations: Int = OperationQueue.defaultMaxConcurrentOperationCount) {
__underlyingQueue = queue
maxConcurrentOperationCount = maxConcurrentOperations
super.init()
queue.setSpecific(key: OperationQueue.OperationQueueKey, value: Unmanaged.passUnretained(self))
}
#endif
internal func _dequeueOperation() -> Operation? {
lock.lock()
let op = _operations.dequeue()
lock.unlock()
return op
}
open func addOperation(_ op: Operation) {
addOperations([op], waitUntilFinished: false)
}
internal func _runOperation() {
if let op = _dequeueOperation() {
if !op.isCancelled {
op._waitUntilReady()
if !op.isCancelled {
op.start()
}
}
}
}
open func addOperations(_ ops: [Operation], waitUntilFinished wait: Bool) {
#if DEPLOYMENT_ENABLE_LIBDISPATCH
var waitGroup: DispatchGroup?
if wait {
waitGroup = DispatchGroup()
}
#endif
/*
If QueuePriority was not supported this could be much faster
since it would not need to have the extra book-keeping for managing a priority
queue. However this implementation attempts to be similar to the specification.
As a concequence this means that the dequeue may NOT nessicarly be the same as
the enqueued operation in this callout. So once the dispatch_block is created
the operation must NOT be touched; since it has nothing to do with the actual
execution. The only differential is that the block enqueued to dispatch_async
is balanced with the number of Operations enqueued to the OperationQueue.
*/
lock.lock()
ops.forEach { (operation: Operation) -> Void in
operation._queue = self
_operations.insert(operation)
}
lock.unlock()
ops.forEach { (operation: Operation) -> Void in
#if DEPLOYMENT_ENABLE_LIBDISPATCH
if let group = waitGroup {
group.enter()
}
let block = DispatchWorkItem(flags: .enforceQoS) { () -> Void in
if let sema = self._concurrencyGate {
sema.wait()
self._runOperation()
sema.signal()
} else {
self._runOperation()
}
if let group = waitGroup {
group.leave()
}
}
_underlyingQueue.async(group: queueGroup, execute: block)
#endif
}
#if DEPLOYMENT_ENABLE_LIBDISPATCH
if let group = waitGroup {
group.wait()
}
#endif
}
internal func _operationFinished(_ operation: Operation) {
lock.lock()
_operations.remove(operation)
operation._queue = nil
lock.unlock()
}
open func addOperation(_ block: @escaping () -> Swift.Void) {
let op = BlockOperation(block: block)
op.qualityOfService = qualityOfService
addOperation(op)
}
// WARNING: the return value of this property can never be used to reliably do anything sensible
open var operations: [Operation] {
lock.lock()
let ops = _operations.map() { $0 }
lock.unlock()
return ops
}
// WARNING: the return value of this property can never be used to reliably do anything sensible
open var operationCount: Int {
lock.lock()
let count = _operations.count
lock.unlock()
return count
}
open var maxConcurrentOperationCount: Int = OperationQueue.defaultMaxConcurrentOperationCount
internal var _suspended = false
open var isSuspended: Bool {
get {
return _suspended
}
set {
lock.lock()
if _suspended != newValue {
_suspended = newValue
#if DEPLOYMENT_ENABLE_LIBDISPATCH
if let queue = __underlyingQueue {
if newValue {
queue.suspend()
} else {
queue.resume()
}
}
#endif
}
lock.unlock()
}
}
internal var _name: String?
open var name: String? {
get {
lock.lock()
let val = _name
lock.unlock()
return val
}
set {
lock.lock()
_name = newValue
#if DEPLOYMENT_ENABLE_LIBDISPATCH
__underlyingQueue = nil
#endif
lock.unlock()
}
}
open var qualityOfService: QualityOfService = .default
#if DEPLOYMENT_ENABLE_LIBDISPATCH
// Note: this will return non nil whereas the objective-c version will only return non nil when it has been set.
// it uses a target queue assignment instead of returning the actual underlying queue.
open var underlyingQueue: DispatchQueue? {
get {
lock.lock()
let queue = __underlyingQueue
lock.unlock()
return queue
}
set {
lock.lock()
__underlyingQueue = newValue
lock.unlock()
}
}
#endif
open func cancelAllOperations() {
lock.lock()
let ops = _operations.map() { $0 }
lock.unlock()
ops.forEach() { $0.cancel() }
}
open func waitUntilAllOperationsAreFinished() {
#if DEPLOYMENT_ENABLE_LIBDISPATCH
queueGroup.wait()
#endif
}
#if DEPLOYMENT_ENABLE_LIBDISPATCH
static let OperationQueueKey = DispatchSpecificKey<Unmanaged<OperationQueue>>()
#endif
open class var current: OperationQueue? {
#if DEPLOYMENT_ENABLE_LIBDISPATCH
guard let specific = DispatchQueue.getSpecific(key: OperationQueue.OperationQueueKey) else {
if _CFIsMainThread() {
return OperationQueue.main
} else {
return nil
}
}
return specific.takeUnretainedValue()
#else
return nil
#endif
}
#if DEPLOYMENT_ENABLE_LIBDISPATCH
private static let _main = OperationQueue(_queue: .main, maxConcurrentOperations: 1)
#endif
open class var main: OperationQueue {
#if DEPLOYMENT_ENABLE_LIBDISPATCH
return _main
#else
fatalError("OperationQueue requires libdispatch")
#endif
}
}