blob: 1d2b0a11fce38aab2723b6d10cd19feaf4dc142e [file] [log] [blame]
// This source file is part of the open source project
// Copyright (c) 2014 - 2017 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
// See for license information
// See for the list of Swift project authors
// RUN: %target-run-stdlib-swift
// REQUIRES: executable_test
// FIXME: This test runs very slowly on watchOS.
// UNSUPPORTED: OS=watchos
public enum ApproximateCount {
case Unknown
case Precise(IntMax)
case Underestimate(IntMax)
case Overestimate(IntMax)
public protocol ApproximateCountableSequence : Sequence {
/// Complexity: amortized O(1).
var approximateCount: ApproximateCount { get }
/// A collection that provides an efficient way to split its index ranges.
public protocol SplittableCollection : Collection {
// We need this protocol so that collections with only forward or bidirectional
// traversals could customize their splitting behavior.
// FIXME: all collections with random access should conform to this protocol
// automatically.
/// Splits a given range of indices into a set of disjoint ranges covering
/// the same elements.
/// Complexity: amortized O(1).
/// FIXME: should that be O(log n) to cover some strange collections?
/// FIXME: index invalidation rules?
/// FIXME: a better name. Users will never want to call this method
/// directly.
/// FIXME: return an optional for the common case when split() cannot
/// subdivide the range further.
func split(_ range: Range<Index>) -> [Range<Index>]
internal func _splitRandomAccessIndexRange<
C : RandomAccessCollection
_ elements: C,
_ range: Range<C.Index>
) -> [Range<C.Index>] {
let startIndex = range.lowerBound
let endIndex = range.upperBound
let length = elements.distance(from: startIndex, to: endIndex)
if length < 2 {
return [range]
let middle = elements.index(startIndex, offsetBy: length / 2)
return [startIndex ..< middle, middle ..< endIndex]
/// A helper object to build a collection incrementally in an efficient way.
/// Using a builder can be more efficient than creating an empty collection
/// instance and adding elements one by one.
public protocol CollectionBuilder {
associatedtype Destination : Collection
associatedtype Element = Destination.Iterator.Element
/// Gives a hint about the expected approximate number of elements in the
/// collection that is being built.
mutating func sizeHint(_ approximateSize: Int)
/// Append `element` to `self`.
/// If a collection being built supports a user-defined order, the element is
/// added at the end.
/// Complexity: amortized O(1).
mutating func append(_ element: Destination.Iterator.Element)
/// Append `elements` to `self`.
/// If a collection being built supports a user-defined order, the element is
/// added at the end.
/// Complexity: amortized O(n), where `n` is equal to `count(elements)`.
mutating func append<
C : Collection
>(contentsOf elements: C)
where C.Iterator.Element == Element
/// Append elements from `otherBuilder` to `self`, emptying `otherBuilder`.
/// Equivalent to::
/// self.append(contentsOf: otherBuilder.takeResult())
/// but is more efficient.
/// Complexity: O(1).
mutating func moveContentsOf(_ otherBuilder: inout Self)
/// Build the collection from the elements that were added to this builder.
/// Once this function is called, the builder may not be reused and no other
/// methods should be called.
/// Complexity: O(n) or better (where `n` is the number of elements that were
/// added to this builder); typically O(1).
mutating func takeResult() -> Destination
public protocol BuildableCollectionProtocol : Collection {
associatedtype Builder : CollectionBuilder
extension Array : SplittableCollection {
public func split(_ range: Range<Int>) -> [Range<Int>] {
return _splitRandomAccessIndexRange(self, range)
public struct ArrayBuilder<T> : CollectionBuilder {
// FIXME: the compiler didn't complain when I remove public on 'Collection'.
// File a bug.
public typealias Destination = Array<T>
public typealias Element = T
internal var _resultParts = [[T]]()
internal var _resultTail = [T]()
public init() {}
public mutating func sizeHint(_ approximateSize: Int) {
public mutating func append(_ element: T) {
public mutating func append<
C : Collection
>(contentsOf elements: C)
where C.Iterator.Element == T {
_resultTail.append(contentsOf: elements)
public mutating func moveContentsOf(_ otherBuilder: inout ArrayBuilder<T>) {
// FIXME: do something smart with the capacity set in this builder and the
// other builder.
_resultTail = []
// FIXME: not O(1)!
_resultParts.append(contentsOf: otherBuilder._resultParts)
otherBuilder._resultParts = []
swap(&_resultTail, &otherBuilder._resultTail)
public mutating func takeResult() -> Destination {
_resultTail = []
// FIXME: optimize. parallelize.
return Array(_resultParts.joined())
extension Array : BuildableCollectionProtocol {
public typealias Builder = ArrayBuilder<Element>
// Fork-join
// As sad as it is, I think for practical performance reasons we should rewrite
// the inner parts of the fork-join framework in C++. In way too many cases
// than necessary Swift requires an extra allocation to pin objects in memory
// for safe multithreaded access. -Dmitri
import SwiftShims
import SwiftPrivate
import Darwin
import Dispatch
// FIXME: port to Linux.
// XFAIL: linux
// A wrapper for pthread_t with platform-independent interface.
public struct _stdlib_pthread_t : Equatable, Hashable {
internal let _value: pthread_t
public var hashValue: Int {
return _value.hashValue
public func == (lhs: _stdlib_pthread_t, rhs: _stdlib_pthread_t) -> Bool {
return lhs._value == rhs._value
public func _stdlib_pthread_self() -> _stdlib_pthread_t {
return _stdlib_pthread_t(_value: pthread_self())
struct _ForkJoinMutex {
var _mutex: UnsafeMutablePointer<pthread_mutex_t>
init() {
_mutex = UnsafeMutablePointer.allocate(capacity: 1)
if pthread_mutex_init(_mutex, nil) != 0 {
func `deinit`() {
if pthread_mutex_destroy(_mutex) != 0 {
_mutex.deallocate(capacity: 1)
func withLock<Result>(_ body: () -> Result) -> Result {
if pthread_mutex_lock(_mutex) != 0 {
let result = body()
if pthread_mutex_unlock(_mutex) != 0 {
return result
struct _ForkJoinCond {
var _cond: UnsafeMutablePointer<pthread_cond_t>
init() {
_cond = UnsafeMutablePointer.allocate(capacity: 1)
if pthread_cond_init(_cond, nil) != 0 {
func `deinit`() {
if pthread_cond_destroy(_cond) != 0 {
_cond.deallocate(capacity: 1)
func signal() {
func wait(_ mutex: _ForkJoinMutex) {
pthread_cond_wait(_cond, mutex._mutex)
final class _ForkJoinOneShotEvent {
var _mutex: _ForkJoinMutex = _ForkJoinMutex()
var _cond: _ForkJoinCond = _ForkJoinCond()
var _isSet: Bool = false
init() {}
deinit {
func set() {
_mutex.withLock {
if !_isSet {
_isSet = true
/// Establishes a happens-before relation between calls to set() and wait().
func wait() {
_mutex.withLock {
while !_isSet {
/// If the function returns true, it establishes a happens-before relation
/// between calls to set() and isSet().
func isSet() -> Bool {
return _mutex.withLock {
return _isSet
final class _ForkJoinWorkDeque<T> {
// FIXME: this is just a proof-of-concept; very inefficient.
// Implementation note: adding elements to the head of the deque is common in
// fork-join, so _deque is stored reversed (appending to an array is cheap).
// FIXME: ^ that is false for submission queues though.
var _deque: ContiguousArray<T> = []
var _dequeMutex: _ForkJoinMutex = _ForkJoinMutex()
init() {}
deinit {
var isEmpty: Bool {
return _dequeMutex.withLock {
return _deque.isEmpty
func prepend(_ element: T) {
_dequeMutex.withLock {
func tryTakeFirst() -> T? {
return _dequeMutex.withLock {
let result = _deque.last
if _deque.count > 0 {
return result
func tryTakeFirstTwo() -> (T?, T?) {
return _dequeMutex.withLock {
let result1 = _deque.last
if _deque.count > 0 {
let result2 = _deque.last
if _deque.count > 0 {
return (result1, result2)
func append(_ element: T) {
_dequeMutex.withLock {
_deque.insert(element, at: 0)
func tryTakeLast() -> T? {
return _dequeMutex.withLock {
let result = _deque.first
if _deque.count > 0 {
_deque.remove(at: 0)
return result
func takeAll() -> ContiguousArray<T> {
return _dequeMutex.withLock {
let result = _deque
_deque = []
return result
func tryReplace(
_ value: T,
makeReplacement: @escaping () -> T,
isEquivalent: @escaping (T, T) -> Bool
) -> Bool {
return _dequeMutex.withLock {
for i in _deque.indices {
if isEquivalent(_deque[i], value) {
_deque[i] = makeReplacement()
return true
return false
final class _ForkJoinWorkerThread {
internal var _tid: _stdlib_pthread_t?
internal let _pool: ForkJoinPool
internal let _submissionQueue: _ForkJoinWorkDeque<ForkJoinTaskBase>
internal let _workDeque: _ForkJoinWorkDeque<ForkJoinTaskBase>
internal init(
_pool: ForkJoinPool,
submissionQueue: _ForkJoinWorkDeque<ForkJoinTaskBase>,
workDeque: _ForkJoinWorkDeque<ForkJoinTaskBase>
) {
self._tid = nil
self._pool = _pool
self._submissionQueue = submissionQueue
self._workDeque = workDeque
internal func startAsync() {
var queue: DispatchQueue?
if #available(OSX 10.10, iOS 8.0, *) {
queue = .background)
} else {
queue = .background)
queue!.async {
internal func _thread() {
print("_ForkJoinWorkerThread begin")
_tid = _stdlib_pthread_self()
outer: while !_workDeque.isEmpty || !_submissionQueue.isEmpty {
while true {
if _pool._tryStopThread() {
print("_ForkJoinWorkerThread detected too many threads")
print("_ForkJoinWorkerThread end")
// Process tasks in FIFO order: first the work queue, then the
// submission queue.
if let task = _workDeque.tryTakeFirst() {
if let task = _submissionQueue.tryTakeFirst() {
print("_ForkJoinWorkerThread stealing tasks")
if let task = _pool._stealTask() {
// FIXME: steal from submission queues?
_ = _pool._totalThreads.fetchAndAdd(-1)
print("_ForkJoinWorkerThread end")
internal func _forkTask(_ task: ForkJoinTaskBase) {
// Try to inflate the pool.
if !_pool._tryCreateThread({ task }) {
internal func _waitForTask(_ task: ForkJoinTaskBase) {
while true {
if task._isComplete() {
// If the task is in work queue of the current thread, run the task.
if _workDeque.tryReplace(
makeReplacement: { ForkJoinTask<()>() {} },
isEquivalent: { $0 === $1 }) {
// We found the task. Run it in-place.
// FIXME: also check the submission queue, maybe the task is there?
// FIXME: try to find the task in other threads' queues.
// FIXME: try to find tasks that were forked from this task in other
// threads' queues. Help thieves by stealing those tasks back.
// At this point, we can't do any work to help with running this task.
// We can't start new work either (if we do, we might end up creating
// more in-flight work than we can chew, and crash with out-of-memory
// errors).
_pool._compensateForBlockedWorkerThread() {
// FIXME: do a timed wait, and retry stealing.
internal protocol _Future {
associatedtype Result
/// Establishes a happens-before relation between completing the future and
/// the call to wait().
func wait()
func tryGetResult() -> Result?
func tryTakeResult() -> Result?
func waitAndGetResult() -> Result
func waitAndTakeResult() -> Result
public class ForkJoinTaskBase {
final internal var _pool: ForkJoinPool?
// FIXME(performance): there is no need to create heavy-weight
// synchronization primitives every time. We could start with a lightweight
// atomic int for the flag and inflate to a full event when needed. Unless
// we really need to block in wait(), we would avoid creating an event.
final internal let _completedEvent: _ForkJoinOneShotEvent =
final internal func _isComplete() -> Bool {
return _completedEvent.isSet()
final internal func _blockingWait() {
internal func _run() {
final public func fork() {
precondition(_pool == nil)
if let thread = ForkJoinPool._getCurrentThread() {
} else {
// FIXME: decide if we want to allow this.
final public func wait() {
if let thread = ForkJoinPool._getCurrentThread() {
} else {
final public class ForkJoinTask<Result> : ForkJoinTaskBase, _Future {
internal let _task: () -> Result
internal var _result: Result?
public init(_task: @escaping () -> Result) {
self._task = _task
override internal func _run() {
/// It is not allowed to call _complete() in a racy way. Only one thread
/// should ever call _complete().
internal func _complete(_ result: Result) {
_result = result
public func tryGetResult() -> Result? {
if _completedEvent.isSet() {
return _result
return nil
public func tryTakeResult() -> Result? {
if _completedEvent.isSet() {
let result = _result
_result = nil
return result
return nil
public func waitAndGetResult() -> Result {
return tryGetResult()!
public func waitAndTakeResult() -> Result {
return tryTakeResult()!
final public class ForkJoinPool {
internal static var _threadRegistry: [_stdlib_pthread_t : _ForkJoinWorkerThread] = [:]
internal static var _threadRegistryMutex: _ForkJoinMutex = _ForkJoinMutex()
internal static func _getCurrentThread() -> _ForkJoinWorkerThread? {
return _threadRegistryMutex.withLock {
return _threadRegistry[_stdlib_pthread_self()]
internal let _maxThreads: Int
/// Total number of threads: number of running threads plus the number of
/// threads that are preparing to start).
internal let _totalThreads: _stdlib_AtomicInt = _stdlib_AtomicInt(0)
internal var _runningThreads: [_ForkJoinWorkerThread] = []
internal var _runningThreadsMutex: _ForkJoinMutex = _ForkJoinMutex()
internal var _submissionQueues: [_ForkJoinWorkDeque<ForkJoinTaskBase>] = []
internal var _submissionQueuesMutex: _ForkJoinMutex = _ForkJoinMutex()
internal var _workDeques: [_ForkJoinWorkDeque<ForkJoinTaskBase>] = []
internal var _workDequesMutex: _ForkJoinMutex = _ForkJoinMutex()
internal init(_commonPool: ()) {
self._maxThreads = _stdlib_getHardwareConcurrency()
deinit {
internal func _addRunningThread(_ thread: _ForkJoinWorkerThread) {
ForkJoinPool._threadRegistryMutex.withLock {
_runningThreadsMutex.withLock {
_submissionQueuesMutex.withLock {
_workDequesMutex.withLock {
ForkJoinPool._threadRegistry[thread._tid!] = thread
internal func _removeRunningThread(_ thread: _ForkJoinWorkerThread) {
ForkJoinPool._threadRegistryMutex.withLock {
_runningThreadsMutex.withLock {
_submissionQueuesMutex.withLock {
_workDequesMutex.withLock {
let i = _runningThreads.index { $0 === thread }!
ForkJoinPool._threadRegistry[thread._tid!] = nil
_runningThreads.remove(at: i)
_submissionQueues.remove(at: i)
_workDeques.remove(at: i)
internal func _compensateForBlockedWorkerThread(_ blockingBody: @escaping () -> ()) {
// FIXME: limit the number of compensating threads.
let submissionQueue = _ForkJoinWorkDeque<ForkJoinTaskBase>()
let workDeque = _ForkJoinWorkDeque<ForkJoinTaskBase>()
let thread = _ForkJoinWorkerThread(
_pool: self, submissionQueue: submissionQueue, workDeque: workDeque)
_ = _totalThreads.fetchAndAdd(1)
internal func _tryCreateThread(
_ makeTask: () -> ForkJoinTaskBase?
) -> Bool {
var success = false
var oldNumThreads = _totalThreads.load()
repeat {
if oldNumThreads >= _maxThreads {
return false
success = _totalThreads.compareExchange(
expected: &oldNumThreads, desired: oldNumThreads + 1)
} while !success
if let task = makeTask() {
let submissionQueue = _ForkJoinWorkDeque<ForkJoinTaskBase>()
let workDeque = _ForkJoinWorkDeque<ForkJoinTaskBase>()
let thread = _ForkJoinWorkerThread(
_pool: self, submissionQueue: submissionQueue, workDeque: workDeque)
} else {
_ = _totalThreads.fetchAndAdd(-1)
return true
internal func _stealTask() -> ForkJoinTaskBase? {
return _workDequesMutex.withLock {
let randomOffset = pickRandom(_workDeques.indices)
let count = _workDeques.count
for i in _workDeques.indices {
let index = (i + randomOffset) % count
if let task = _workDeques[index].tryTakeLast() {
return task
return nil
/// Check if the pool has grown too large because of compensating
/// threads.
internal func _tryStopThread() -> Bool {
var success = false
var oldNumThreads = _totalThreads.load()
repeat {
// FIXME: magic number 2.
if oldNumThreads <= _maxThreads + 2 {
return false
success = _totalThreads.compareExchange(
expected: &oldNumThreads, desired: oldNumThreads - 1)
} while !success
return true
internal func _submitTasksToRandomWorkers<
C : Collection
>(_ tasks: C)
where C.Iterator.Element == ForkJoinTaskBase {
if tasks.isEmpty {
_submissionQueuesMutex.withLock {
for task in tasks {
public func forkTask(_ task: ForkJoinTaskBase) {
while true {
// Try to inflate the pool first.
if _tryCreateThread({ task }) {
// Looks like we can't create more threads. Submit the task to
// a random thread.
let done = _submissionQueuesMutex.withLock {
() -> Bool in
if !_submissionQueues.isEmpty {
return true
return false
if done {
// FIXME: return a Future instead?
public func forkTask<Result>(task: @escaping () -> Result) -> ForkJoinTask<Result> {
let forkJoinTask = ForkJoinTask(_task: task)
return forkJoinTask
public static var commonPool = ForkJoinPool(_commonPool: ())
public static func invokeAll(_ tasks: ForkJoinTaskBase...) {
public static func invokeAll(_ tasks: [ForkJoinTaskBase]) {
if tasks.isEmpty {
if ForkJoinPool._getCurrentThread() != nil {
// Run the first task in this thread, fork the rest.
let first = tasks.first
for t in tasks.dropFirst() {
// FIXME: optimize forking in bulk.
} else {
// FIXME: decide if we want to allow this.
// Collection transformation DSL: implementation
internal protocol _CollectionTransformerStepProtocol /*: class*/ {
associatedtype PipelineInputElement
associatedtype OutputElement
func transform<
InputCollection : Collection,
Collector : _ElementCollector
_ c: InputCollection,
_ range: Range<InputCollection.Index>,
_ collector: inout Collector
InputCollection.Iterator.Element == PipelineInputElement,
Collector.Element == OutputElement
internal class _CollectionTransformerStep<PipelineInputElement_, OutputElement_>
: _CollectionTransformerStepProtocol {
typealias PipelineInputElement = PipelineInputElement_
typealias OutputElement = OutputElement_
func map<U>(_ transform: @escaping (OutputElement) -> U)
-> _CollectionTransformerStep<PipelineInputElement, U> {
fatalError("abstract method")
func filter(_ isIncluded: @escaping (OutputElement) -> Bool)
-> _CollectionTransformerStep<PipelineInputElement, OutputElement> {
fatalError("abstract method")
func reduce<U>(_ initial: U, _ combine: @escaping (U, OutputElement) -> U)
-> _CollectionTransformerFinalizer<PipelineInputElement, U> {
fatalError("abstract method")
func collectTo<
C : BuildableCollectionProtocol
>(_: C.Type) -> _CollectionTransformerFinalizer<PipelineInputElement, C>
C.Builder.Destination == C,
C.Builder.Element == C.Iterator.Element,
C.Iterator.Element == OutputElement {
fatalError("abstract method")
func transform<
InputCollection : Collection,
Collector : _ElementCollector
_ c: InputCollection,
_ range: Range<InputCollection.Index>,
_ collector: inout Collector
InputCollection.Iterator.Element == PipelineInputElement,
Collector.Element == OutputElement {
fatalError("abstract method")
final internal class _CollectionTransformerStepCollectionSource<
> : _CollectionTransformerStep<PipelineInputElement, PipelineInputElement> {
typealias InputElement = PipelineInputElement
override func map<U>(_ transform: @escaping (InputElement) -> U)
-> _CollectionTransformerStep<PipelineInputElement, U> {
return _CollectionTransformerStepOneToMaybeOne(self) {
override func filter(_ isIncluded: @escaping (InputElement) -> Bool)
-> _CollectionTransformerStep<PipelineInputElement, InputElement> {
return _CollectionTransformerStepOneToMaybeOne(self) {
isIncluded($0) ? $0 : nil
override func reduce<U>(_ initial: U, _ combine: @escaping (U, InputElement) -> U)
-> _CollectionTransformerFinalizer<PipelineInputElement, U> {
return _CollectionTransformerFinalizerReduce(self, initial, combine)
override func collectTo<
C : BuildableCollectionProtocol
>(_ c: C.Type) -> _CollectionTransformerFinalizer<PipelineInputElement, C>
C.Builder.Destination == C,
C.Builder.Element == C.Iterator.Element,
C.Iterator.Element == OutputElement {
return _CollectionTransformerFinalizerCollectTo(self, c)
override func transform<
InputCollection : Collection,
Collector : _ElementCollector
_ c: InputCollection,
_ range: Range<InputCollection.Index>,
_ collector: inout Collector
InputCollection.Iterator.Element == PipelineInputElement,
Collector.Element == OutputElement {
var i = range.lowerBound
while i != range.upperBound {
let e = c[i]
c.formIndex(after: &i)
final internal class _CollectionTransformerStepOneToMaybeOne<
InputStep : _CollectionTransformerStepProtocol
> : _CollectionTransformerStep<PipelineInputElement, OutputElement>
where InputStep.PipelineInputElement == PipelineInputElement {
typealias _Self = _CollectionTransformerStepOneToMaybeOne
typealias InputElement = InputStep.OutputElement
let _input: InputStep
let _transform: (InputElement) -> OutputElement?
init(_ input: InputStep, _ transform: @escaping (InputElement) -> OutputElement?) {
self._input = input
self._transform = transform
override func map<U>(_ transform: @escaping (OutputElement) -> U)
-> _CollectionTransformerStep<PipelineInputElement, U> {
// Let the closure below capture only one variable, not the whole `self`.
let localTransform = _transform
return _CollectionTransformerStepOneToMaybeOne<PipelineInputElement, U, InputStep>(_input) {
(input: InputElement) -> U? in
if let e = localTransform(input) {
return transform(e)
return nil
override func filter(_ isIncluded: @escaping (OutputElement) -> Bool)
-> _CollectionTransformerStep<PipelineInputElement, OutputElement> {
// Let the closure below capture only one variable, not the whole `self`.
let localTransform = _transform
return _CollectionTransformerStepOneToMaybeOne<PipelineInputElement, OutputElement, InputStep>(_input) {
(input: InputElement) -> OutputElement? in
if let e = localTransform(input) {
return isIncluded(e) ? e : nil
return nil
override func reduce<U>(_ initial: U, _ combine: @escaping (U, OutputElement) -> U)
-> _CollectionTransformerFinalizer<PipelineInputElement, U> {
return _CollectionTransformerFinalizerReduce(self, initial, combine)
override func collectTo<
C : BuildableCollectionProtocol
>(_ c: C.Type) -> _CollectionTransformerFinalizer<PipelineInputElement, C>
C.Builder.Destination == C,
C.Builder.Element == C.Iterator.Element,
C.Iterator.Element == OutputElement {
return _CollectionTransformerFinalizerCollectTo(self, c)
override func transform<
InputCollection : Collection,
Collector : _ElementCollector
_ c: InputCollection,
_ range: Range<InputCollection.Index>,
_ collector: inout Collector
InputCollection.Iterator.Element == PipelineInputElement,
Collector.Element == OutputElement {
var collectorWrapper =
_ElementCollectorOneToMaybeOne(collector, _transform)
_input.transform(c, range, &collectorWrapper)
collector = collectorWrapper._baseCollector
struct _ElementCollectorOneToMaybeOne<
BaseCollector : _ElementCollector,
> : _ElementCollector {
typealias Element = Element_
var _baseCollector: BaseCollector
var _transform: (Element) -> BaseCollector.Element?
_ baseCollector: BaseCollector,
_ transform: @escaping (Element) -> BaseCollector.Element?
) {
self._baseCollector = baseCollector
self._transform = transform
mutating func sizeHint(_ approximateSize: Int) {}
mutating func append(_ element: Element) {
if let e = _transform(element) {
mutating func append<
C : Collection
>(contentsOf elements: C)
where C.Iterator.Element == Element {
for e in elements {
protocol _ElementCollector {
associatedtype Element
mutating func sizeHint(_ approximateSize: Int)
mutating func append(_ element: Element)
mutating func append<
C : Collection
>(contentsOf elements: C)
where C.Iterator.Element == Element
class _CollectionTransformerFinalizer<PipelineInputElement, Result> {
func transform<
InputCollection : Collection
>(_ c: InputCollection) -> Result
where InputCollection.Iterator.Element == PipelineInputElement {
final class _CollectionTransformerFinalizerReduce<
InputStep : _CollectionTransformerStepProtocol
> : _CollectionTransformerFinalizer<PipelineInputElement, U>
InputStep.OutputElement == InputElementTy,
InputStep.PipelineInputElement == PipelineInputElement {
var _input: InputStep
var _initial: U
var _combine: (U, InputElementTy) -> U
init(_ input: InputStep, _ initial: U, _ combine: @escaping (U, InputElementTy) -> U) {
self._input = input
self._initial = initial
self._combine = combine
override func transform<
InputCollection : Collection
>(_ c: InputCollection) -> U
where InputCollection.Iterator.Element == PipelineInputElement {
var collector = _ElementCollectorReduce(_initial, _combine)
_input.transform(c, c.startIndex..<c.endIndex, &collector)
return collector.takeResult()
struct _ElementCollectorReduce<Element_, Result> : _ElementCollector {
typealias Element = Element_
var _current: Result
var _combine: (Result, Element) -> Result
init(_ initial: Result, _ combine: @escaping (Result, Element) -> Result) {
self._current = initial
self._combine = combine
mutating func sizeHint(_ approximateSize: Int) {}
mutating func append(_ element: Element) {
_current = _combine(_current, element)
mutating func append<
C : Collection
>(contentsOf elements: C)
where C.Iterator.Element == Element {
for e in elements {
mutating func takeResult() -> Result {
return _current
final class _CollectionTransformerFinalizerCollectTo<
U : BuildableCollectionProtocol,
InputStep : _CollectionTransformerStepProtocol
> : _CollectionTransformerFinalizer<PipelineInputElement, U>
InputStep.OutputElement == InputElementTy,
InputStep.PipelineInputElement == PipelineInputElement,
U.Builder.Destination == U,
U.Builder.Element == U.Iterator.Element,
U.Iterator.Element == InputStep.OutputElement {
var _input: InputStep
init(_ input: InputStep, _: U.Type) {
self._input = input
override func transform<
InputCollection : Collection
>(_ c: InputCollection) -> U
where InputCollection.Iterator.Element == PipelineInputElement {
var collector = _ElementCollectorCollectTo<U>()
_input.transform(c, c.startIndex..<c.endIndex, &collector)
return collector.takeResult()
struct _ElementCollectorCollectTo<
BuildableCollection : BuildableCollectionProtocol
> : _ElementCollector
BuildableCollection.Builder.Destination == BuildableCollection,
BuildableCollection.Builder.Element == BuildableCollection.Iterator.Element {
typealias Element = BuildableCollection.Iterator.Element
var _builder: BuildableCollection.Builder
init() {
self._builder = BuildableCollection.Builder()
mutating func sizeHint(_ approximateSize: Int) {
mutating func append(_ element: Element) {
mutating func append<
C : Collection
>(contentsOf elements: C)
where C.Iterator.Element == Element {
_builder.append(contentsOf: elements)
mutating func takeResult() -> BuildableCollection {
return _builder.takeResult()
internal func _optimizeCollectionTransformer<PipelineInputElement, Result>(
_ transformer: _CollectionTransformerFinalizer<PipelineInputElement, Result>
) -> _CollectionTransformerFinalizer<PipelineInputElement, Result> {
return transformer
internal func _runCollectionTransformer<
InputCollection : Collection, Result
_ c: InputCollection,
_ transformer: _CollectionTransformerFinalizer<InputCollection.Iterator.Element, Result>
) -> Result {
let optimized = _optimizeCollectionTransformer(transformer)
return transformer.transform(c)
// Collection transformation DSL: public interface
public struct CollectionTransformerPipeline<
InputCollection : Collection, T
> {
internal var _input: InputCollection
internal var _step: _CollectionTransformerStep<InputCollection.Iterator.Element, T>
public func map<U>(_ transform: @escaping (T) -> U)
-> CollectionTransformerPipeline<InputCollection, U> {
return CollectionTransformerPipeline<InputCollection, U>(
_input: _input,
public func filter(_ isIncluded: @escaping (T) -> Bool)
-> CollectionTransformerPipeline<InputCollection, T> {
return CollectionTransformerPipeline<InputCollection, T>(
_input: _input,
_step: _step.filter(isIncluded)
public func reduce<U>(
_ initial: U, _ combine: @escaping (U, T) -> U
) -> U {
return _runCollectionTransformer(_input, _step.reduce(initial, combine))
public func collectTo<
C : BuildableCollectionProtocol
>(_ c: C.Type) -> C
C.Builder.Destination == C,
C.Iterator.Element == T,
C.Builder.Element == T {
return _runCollectionTransformer(_input, _step.collectTo(c))
public func toArray() -> [T] {
return collectTo(Array<T>.self)
public func transform<C : Collection>(_ c: C)
-> CollectionTransformerPipeline<C, C.Iterator.Element> {
return CollectionTransformerPipeline<C, C.Iterator.Element>(
_input: c,
_step: _CollectionTransformerStepCollectionSource<C.Iterator.Element>())
// Collection transformation DSL: tests
import StdlibUnittest
var t = TestSuite("t")
t.test("fusion/map+reduce") {
let xs = [ 1, 2, 3 ]
let result =
.map { $0 * 2 }
.reduce(0, { $0 + $1 })
expectEqual(12, result)
t.test("fusion/map+filter+reduce") {
let xs = [ 1, 2, 3 ]
let result = transform(xs)
.map { $0 * 2 }
.filter { $0 != 0 }
.reduce(0, { $0 + $1 })
expectEqual(12, result)
t.test("fusion/map+collectTo") {
let xs = [ 1, 2, 3 ]
let result =
.map { $0 * 2 }
expectEqual([ 2, 4, 6 ], result)
t.test("fusion/map+toArray") {
let xs = [ 1, 2, 3 ]
let result =
.map { $0 * 2 }
expectEqual([ 2, 4, 6 ], result)
t.test("ForkJoinPool.forkTask") {
var tasks: [ForkJoinTask<()>] = []
for i in 0..<100 {
tasks.append(ForkJoinPool.commonPool.forkTask {
() -> () in
var result = 1
for i in 0..<10000 {
result = result &* i
return ()
for t in tasks {
func fib(_ n: Int) -> Int {
if n == 1 || n == 2 {
return 1
if n == 38 {
print("\(pthread_self()) fib(\(n))")
if n < 39 {
let r = fib(n - 1) + fib(n - 2)
return r
let t1 = ForkJoinTask() { fib(n - 1) }
let t2 = ForkJoinTask() { fib(n - 2) }
ForkJoinPool.invokeAll(t1, t2)
return t2.waitAndGetResult() + t1.waitAndGetResult()
t.test("ForkJoinPool.forkTask/Fibonacci") {
let t = ForkJoinPool.commonPool.forkTask { fib(40) }
expectEqual(102334155, t.waitAndGetResult())
func _parallelMap(_ input: [Int], transform: @escaping (Int) -> Int, range: Range<Int>)
-> Array<Int>.Builder {
var builder = Array<Int>.Builder()
if range.count < 1_000 {
builder.append(contentsOf: input[range].map(transform))
} else {
let tasks = input.split(range).map {
(subRange) in
ForkJoinTask<Array<Int>.Builder> {
_parallelMap(input, transform: transform, range: subRange)
for t in tasks {
var otherBuilder = t.waitAndGetResult()
return builder
func parallelMap(_ input: [Int], transform: @escaping (Int) -> Int) -> [Int] {
let t = ForkJoinPool.commonPool.forkTask {
transform: transform,
range: input.startIndex..<input.endIndex)
var builder = t.waitAndGetResult()
return builder.takeResult()
t.test("ForkJoinPool.forkTask/MapArray") {
parallelMap(Array(1..<1_000)) { $0 + 1 }
* FIXME: reduce compiler crasher
t.test("ForkJoinPool.forkTask") {
func fib(_ n: Int) -> Int {
if n == 0 || n == 1 {
return 1
let t1 = ForkJoinPool.commonPool.forkTask { fib(n - 1) }
let t2 = ForkJoinPool.commonPool.forkTask { fib(n - 2) }
return t2.waitAndGetResult() + t1.waitAndGetResult()
expectEqual(0, fib(10))
Useful links: