| // This source file is part of the Swift.org open source project |
| // |
| // Copyright (c) 2014 - 2016 Apple Inc. and the Swift project authors |
| // Licensed under Apache License v2.0 with Runtime Library Exception |
| // |
| // See http://swift.org/LICENSE.txt for license information |
| // See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors |
| // |
| |
| import CoreFoundation |
| |
| // MARK: Port and related types |
| |
| public typealias SocketNativeHandle = Int32 |
| |
| extension Port { |
| public static let didBecomeInvalidNotification = NSNotification.Name(rawValue: "NSPortDidBecomeInvalidNotification") |
| } |
| |
| open class Port : NSObject, NSCopying { |
| @available(*, deprecated, message: "On Darwin, you can invoke Port() directly to produce a MessagePort. Since MessagePort's functionality is not available in swift-corelibs-foundation, you should not invoke this initializer directly. Subclasses of Port can delegate to this initializer safely.") |
| public override init() { |
| if type(of: self) == Port.self { |
| NSRequiresConcreteImplementation() |
| } |
| } |
| |
| open func copy(with zone: NSZone? = nil) -> Any { |
| return self |
| } |
| |
| open func invalidate() { |
| NSRequiresConcreteImplementation() |
| } |
| |
| open var isValid: Bool { |
| NSRequiresConcreteImplementation() |
| } |
| |
| open func setDelegate(_ anObject: PortDelegate?) { |
| NSRequiresConcreteImplementation() |
| } |
| open func delegate() -> PortDelegate? { |
| NSRequiresConcreteImplementation() |
| } |
| |
| // These two methods should be implemented by subclasses |
| // to setup monitoring of the port when added to a run loop, |
| // and stop monitoring if needed when removed; |
| // These methods should not be called directly! |
| open func schedule(in runLoop: RunLoop, forMode mode: RunLoop.Mode) { |
| NSRequiresConcreteImplementation() |
| } |
| |
| open func remove(from runLoop: RunLoop, forMode mode: RunLoop.Mode) { |
| NSRequiresConcreteImplementation() |
| } |
| |
| open var reservedSpaceLength: Int { |
| return 0 |
| } |
| |
| open func sendBeforeDate(_ limitDate: Date, components: NSMutableArray?, from receivePort: Port?, reserved headerSpaceReserved: Int) -> Bool { |
| NSRequiresConcreteImplementation() |
| } |
| |
| open func sendBeforeDate(_ limitDate: Date, msgid msgID: Int, components: NSMutableArray?, from receivePort: Port?, reserved headerSpaceReserved: Int) -> Bool { |
| return sendBeforeDate(limitDate, components: components, from: receivePort, reserved: headerSpaceReserved) |
| } |
| } |
| |
| @available(*, unavailable, message: "MessagePort is not available in swift-corelibs-foundation.") |
| open class MessagePort: Port {} |
| |
| @available(*, unavailable, message: "NSMachPort is not available in swift-corelibs-foundation.") |
| open class NSMachPort: Port {} |
| |
| extension PortDelegate { |
| func handle(_ message: PortMessage) { } |
| } |
| |
| public protocol PortDelegate : class { |
| func handle(_ message: PortMessage) |
| } |
| |
| #if canImport(Glibc) && !os(Android) |
| import Glibc |
| fileprivate let SOCK_STREAM = Int32(Glibc.SOCK_STREAM.rawValue) |
| fileprivate let SOCK_DGRAM = Int32(Glibc.SOCK_DGRAM.rawValue) |
| fileprivate let IPPROTO_TCP = Int32(Glibc.IPPROTO_TCP) |
| #endif |
| |
| #if canImport(Glibc) && os(Android) |
| import Glibc |
| fileprivate let SOCK_STREAM = Int32(Glibc.SOCK_STREAM) |
| fileprivate let SOCK_DGRAM = Int32(Glibc.SOCK_DGRAM) |
| fileprivate let IPPROTO_TCP = Int32(Glibc.IPPROTO_TCP) |
| fileprivate let INADDR_ANY: in_addr_t = 0 |
| #endif |
| |
| |
| #if canImport(WinSDK) |
| import WinSDK |
| /* |
| // https://docs.microsoft.com/en-us/windows/win32/api/winsock/ns-winsock-sockaddr_in |
| typedef struct sockaddr_in { |
| short sin_family; |
| u_short sin_port; |
| struct in_addr sin_addr; |
| char sin_zero[8]; |
| } SOCKADDR_IN, *PSOCKADDR_IN, *LPSOCKADDR_IN; |
| */ |
| fileprivate typealias sa_family_t = ADDRESS_FAMILY |
| fileprivate typealias in_port_t = USHORT |
| fileprivate typealias in_addr_t = UInt32 |
| fileprivate let IPPROTO_TCP = Int32(WinSDK.IPPROTO_TCP.rawValue) |
| #endif |
| |
| // MARK: Darwin representation of socket addresses |
| |
| /* |
| ===== YOU ARE ABOUT TO ENTER THE SADNESS ZONE ===== |
| |
| SocketPort transmits ports by sending _Darwin_ sockaddr values serialized over the wire. (Yeah.) |
| This means that whatever the platform, we need to be able to send Darwin sockaddrs and figure them out on the other side of the wire. |
| |
| Now, the vast majority of the intreresting ports that may be sent is AF_INET and AF_INET6 — other sockets aren't uncommon, but they are generally local to their host (eg. AF_UNIX). So, we make the following tactical choice: |
| |
| - swift-corelibs-foundation clients across all platforms can interoperate between themselves and with Darwin as long as all the ports that are sent through SocketPort are AF_INET or AF_INET6; |
| - otherwise, it is the implementor and deployer's responsibility to make sure all the clients are on the same platform. For sockets that do not leave the machine, like AF_UNIX, this is trivial. |
| |
| This allows us to special-case sockaddr_in and sockaddr_in6; when we transmit them, we will transmit them in a way that's binary-compatible with Darwin's version of those structure, and then we translate them into whatever version your platform uses, with the one exception that we assume that in_addr is always representable as 4 contiguous bytes, and similarly in6_addr as 16 contiguous bytes. |
| |
| Addresses are internally represented as LocalAddress enum cases; we use DarwinAddress as a type to denote a block of bytes that we type as being a sockaddr produced by Darwin or a Darwin-mimicking source; and the extensions on sockaddr_in and sockaddr_in6 paper over some platform differences and provide translations back and forth into DarwinAddresses for their specific types. |
| |
| Note that all address parameters and properties that are public are the data representation of a sockaddr generated starting from the current platform's sockaddrs, not a DarwinAddress. No code that's a client of SocketPort should be able to see the Darwin version of the data we generate internally. |
| */ |
| |
| fileprivate let darwinAfInet: UInt8 = 2 |
| fileprivate let darwinAfInet6: UInt8 = 30 |
| fileprivate let darwinSockaddrInSize: UInt8 = 16 |
| fileprivate let darwinSockaddrIn6Size: UInt8 = 28 |
| |
| fileprivate typealias DarwinIn6Addr = |
| (UInt8, UInt8, UInt8, UInt8, UInt8, UInt8, UInt8, UInt8, |
| UInt8, UInt8, UInt8, UInt8, UInt8, UInt8, UInt8, UInt8) // 16 contiguous bytes |
| fileprivate let darwinIn6AddrSize = 16 |
| |
| fileprivate extension sockaddr_in { |
| // Not all platforms have a sin_len field. This is like init(), but also sets that field if it exists. |
| init(settingLength: ()) { |
| self.init() |
| |
| #if canImport(Darwin) || os(FreeBSD) |
| self.sin_len = UInt8(MemoryLayout<sockaddr_in>.size) |
| #endif |
| } |
| |
| init?(_ address: DarwinAddress) { |
| let data = address.data |
| guard data.count == darwinSockaddrInSize, |
| data[offset: 0] == darwinSockaddrInSize, |
| data[offset: 1] == darwinAfInet else { return nil } |
| |
| var port: UInt16 = 0 |
| var inAddr: UInt32 = 0 |
| |
| data.withUnsafeBytes { (buffer) -> Void in |
| withUnsafeMutableBytes(of: &port) { |
| $0.copyMemory(from: UnsafeRawBufferPointer(rebasing: buffer[2..<4])) |
| } |
| withUnsafeMutableBytes(of: &inAddr) { |
| $0.copyMemory(from: UnsafeRawBufferPointer(rebasing: buffer[4..<8])) |
| } |
| } |
| |
| self.init(settingLength: ()) |
| self.sin_family = sa_family_t(AF_INET) |
| self.sin_port = in_port_t(port) |
| withUnsafeMutableBytes(of: &self.sin_addr) { (buffer) in |
| withUnsafeBytes(of: inAddr) { buffer.copyMemory(from: $0) } |
| } |
| } |
| |
| var darwinAddress: DarwinAddress { |
| var data = Data() |
| withUnsafeBytes(of: darwinSockaddrInSize) { data.append(contentsOf: $0) } |
| withUnsafeBytes(of: darwinAfInet) { data.append(contentsOf: $0) } |
| withUnsafeBytes(of: UInt16(sin_port)) { data.append(contentsOf: $0) } |
| withUnsafeBytes(of: sin_addr) { data.append(contentsOf: $0) } |
| |
| let padding: (Int8, Int8, Int8, Int8, Int8, Int8, Int8, Int8) = |
| (0, 0, 0, 0, 0, 0, 0, 0) |
| withUnsafeBytes(of: padding) { data.append(contentsOf: $0) } |
| |
| return DarwinAddress(data) |
| } |
| } |
| |
| fileprivate extension sockaddr_in6 { |
| init(settingLength: ()) { |
| self.init() |
| |
| #if canImport(Darwin) || os(FreeBSD) |
| self.sin6_len = UInt8(MemoryLayout<sockaddr_in6>.size) |
| #endif |
| } |
| |
| init?(_ address: DarwinAddress) { |
| let data = address.data |
| guard data.count == darwinSockaddrIn6Size, |
| data[offset: 0] == darwinSockaddrIn6Size, |
| data[offset: 1] == darwinAfInet6 else { return nil } |
| |
| var port: UInt16 = 0 |
| var flowInfo: UInt32 = 0 |
| var in6Addr: DarwinIn6Addr = |
| (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) |
| var scopeId: UInt32 = 0 |
| |
| data.withUnsafeBytes { (buffer) -> Void in |
| withUnsafeMutableBytes(of: &port) { |
| $0.copyMemory(from: UnsafeRawBufferPointer(rebasing: buffer[2..<4])) |
| } |
| withUnsafeMutableBytes(of: &flowInfo) { |
| $0.copyMemory(from: UnsafeRawBufferPointer(rebasing: buffer[4..<8])) |
| } |
| withUnsafeMutableBytes(of: &in6Addr) { |
| $0.copyMemory(from: UnsafeRawBufferPointer(rebasing: buffer[8..<24])) |
| } |
| withUnsafeMutableBytes(of: &scopeId) { |
| $0.copyMemory(from: UnsafeRawBufferPointer(rebasing: buffer[24..<28])) |
| } |
| } |
| |
| self.init(settingLength: ()) |
| self.sin6_family = sa_family_t(AF_INET6) |
| self.sin6_port = in_port_t(port) |
| self.sin6_flowinfo = flowInfo |
| withUnsafeMutableBytes(of: &self.sin6_addr) { (buffer) in |
| withUnsafeBytes(of: in6Addr) { buffer.copyMemory(from: $0) } |
| } |
| #if !os(Windows) |
| self.sin6_scope_id = scopeId |
| #endif |
| } |
| |
| var darwinAddress: DarwinAddress { |
| var data = Data() |
| withUnsafeBytes(of: darwinSockaddrIn6Size) { data.append(contentsOf: $0) } |
| withUnsafeBytes(of: darwinAfInet6) { data.append(contentsOf: $0) } |
| withUnsafeBytes(of: UInt16(sin6_port)) { data.append(contentsOf: $0) } |
| withUnsafeBytes(of: UInt32(sin6_flowinfo)) { data.append(contentsOf: $0) } |
| withUnsafeBytes(of: sin6_addr) { data.append(contentsOf: $0) } |
| |
| #if os(Windows) |
| withUnsafeBytes(of: UInt32(0)) { data.append(contentsOf: $0) } |
| #else |
| withUnsafeBytes(of: UInt32(sin6_scope_id)) { data.append(contentsOf: $0) } |
| #endif |
| |
| return DarwinAddress(data) |
| } |
| } |
| |
| enum LocalAddress: Hashable { |
| case ipv4(sockaddr_in) |
| case ipv6(sockaddr_in6) |
| case other(Data) |
| |
| init(_ data: Data) { |
| if data.count == MemoryLayout<sockaddr_in>.size { |
| let sinAddr = data.withUnsafeBytes { $0.baseAddress!.load(as: sockaddr_in.self) } |
| if sinAddr.sin_family == sa_family_t(AF_INET) { |
| self = .ipv4(sinAddr) |
| return |
| } |
| } |
| |
| if data.count == MemoryLayout<sockaddr_in6>.size { |
| let sinAddr = data.withUnsafeBytes { $0.baseAddress!.load(as: sockaddr_in6.self) } |
| if sinAddr.sin6_family == sa_family_t(AF_INET6) { |
| self = .ipv6(sinAddr) |
| return |
| } |
| } |
| |
| self = .other(data) |
| } |
| |
| init(_ darwinAddress: DarwinAddress) { |
| let data = Data(darwinAddress.data) |
| if data[offset: 1] == UInt8(AF_INET), let sinAddr = sockaddr_in(darwinAddress) { |
| self = .ipv4(sinAddr); return |
| } |
| |
| if data[offset: 1] == UInt8(AF_INET6), let sinAddr = sockaddr_in6(darwinAddress) { |
| self = .ipv6(sinAddr); return |
| } |
| |
| self = .other(darwinAddress.data) |
| } |
| |
| var data: Data { |
| switch self { |
| case .ipv4(let sinAddr): |
| return withUnsafeBytes(of: sinAddr) { Data($0) } |
| case .ipv6(let sinAddr): |
| return withUnsafeBytes(of: sinAddr) { Data($0) } |
| case .other(let data): |
| return data |
| } |
| } |
| |
| func hash(into hasher: inout Hasher) { |
| data.hash(into: &hasher) |
| } |
| |
| static func ==(_ lhs: LocalAddress, _ rhs: LocalAddress) -> Bool { |
| return lhs.data == rhs.data |
| } |
| } |
| |
| struct DarwinAddress: Hashable { |
| var data: Data |
| |
| init(_ data: Data) { |
| self.data = data |
| } |
| |
| init(_ localAddress: LocalAddress) { |
| switch localAddress { |
| case .ipv4(let sinAddr): |
| self = sinAddr.darwinAddress |
| |
| case .ipv6(let sinAddr): |
| self = sinAddr.darwinAddress |
| |
| case .other(let data): |
| self.data = data |
| } |
| } |
| } |
| |
| // MARK: SocketPort |
| |
| // A subclass of Port which can be used for remote |
| // message sending on all platforms. |
| |
| fileprivate func __NSFireSocketAccept(_ socket: CFSocket?, _ type: CFSocketCallBackType, _ address: CFData?, _ data: UnsafeRawPointer?, _ info: UnsafeMutableRawPointer?) { |
| guard let nonoptionalInfo = info else { |
| return |
| } |
| |
| let me = Unmanaged<SocketPort>.fromOpaque(nonoptionalInfo).takeUnretainedValue() |
| me.socketDidAccept(socket, type, address, data) |
| } |
| |
| fileprivate func __NSFireSocketData(_ socket: CFSocket?, _ type: CFSocketCallBackType, _ address: CFData?, _ data: UnsafeRawPointer?, _ info: UnsafeMutableRawPointer?) { |
| guard let nonoptionalInfo = info else { |
| return |
| } |
| |
| let me = Unmanaged<SocketPort>.fromOpaque(nonoptionalInfo).takeUnretainedValue() |
| me.socketDidReceiveData(socket, type, address, data) |
| } |
| |
| fileprivate func __NSFireSocketDatagram(_ socket: CFSocket?, _ type: CFSocketCallBackType, _ address: CFData?, _ data: UnsafeRawPointer?, _ info: UnsafeMutableRawPointer?) { |
| guard let nonoptionalInfo = info else { |
| return |
| } |
| |
| let me = Unmanaged<SocketPort>.fromOpaque(nonoptionalInfo).takeUnretainedValue() |
| me.socketDidReceiveDatagram(socket, type, address, data) |
| } |
| |
| open class SocketPort : Port { |
| struct SocketKind: Hashable { |
| var protocolFamily: Int32 |
| var socketType: Int32 |
| var `protocol`: Int32 |
| } |
| |
| struct Signature: Hashable { |
| var address: LocalAddress |
| |
| var protocolFamily: Int32 |
| var socketType: Int32 |
| var `protocol`: Int32 |
| |
| var socketKind: SocketKind { |
| get { |
| return SocketKind(protocolFamily: protocolFamily, socketType: socketType, protocol: `protocol`) |
| } |
| set { |
| self.protocolFamily = newValue.protocolFamily |
| self.socketType = newValue.socketType |
| self.protocol = newValue.protocol |
| } |
| } |
| |
| var darwinCompatibleDataRepresentation: Data? { |
| var data = Data() |
| let address = DarwinAddress(self.address).data |
| |
| guard let protocolFamilyByte = UInt8(exactly: protocolFamily), |
| let socketTypeByte = UInt8(exactly: socketType), |
| let protocolByte = UInt8(exactly: `protocol`), |
| let addressCountByte = UInt8(exactly: address.count) else { |
| return nil |
| } |
| |
| // TODO: Fixup namelen in Unix socket name. |
| |
| data.append(protocolFamilyByte) |
| data.append(socketTypeByte) |
| data.append(protocolByte) |
| data.append(addressCountByte) |
| data.append(contentsOf: address) |
| |
| return data |
| } |
| |
| init(address: LocalAddress, protocolFamily: Int32, socketType: Int32, protocol: Int32) { |
| self.address = address |
| self.protocolFamily = protocolFamily |
| self.socketType = socketType |
| self.protocol = `protocol` |
| } |
| |
| init?(darwinCompatibleDataRepresentation data: Data) { |
| guard data.count > 3 else { return nil } |
| let addressCountByte = data[offset: 3] |
| guard data.count == addressCountByte + 4 else { return nil } |
| |
| self.protocolFamily = Int32(data[offset: 0]) |
| self.socketType = Int32(data[offset: 1]) |
| self.protocol = Int32(data[offset: 2]) |
| // data[3] is addressCountByte, handled above. |
| self.address = LocalAddress(DarwinAddress(data[offset: 4...])) |
| } |
| } |
| |
| class Core { |
| fileprivate let isUniqued: Bool |
| fileprivate var signature: Signature! |
| |
| fileprivate let lock = NSLock() |
| fileprivate var connectors: [Signature: CFSocket] = [:] |
| fileprivate var loops: [ObjectIdentifier: (runLoop: CFRunLoop, modes: Set<RunLoop.Mode>)] = [:] |
| fileprivate var receiver: CFSocket? |
| fileprivate var data: [ObjectIdentifier: Data] = [:] |
| |
| init(isUniqued: Bool) { self.isUniqued = isUniqued } |
| } |
| |
| private var core: Core! |
| |
| public convenience override init() { |
| self.init(tcpPort: 0)! |
| } |
| |
| public convenience init?(tcpPort port: UInt16) { |
| var address = sockaddr_in(settingLength: ()) |
| address.sin_family = sa_family_t(AF_INET) |
| address.sin_port = in_port_t(port).bigEndian |
| withUnsafeMutableBytes(of: &address.sin_addr) { (buffer) in |
| withUnsafeBytes(of: in_addr_t(INADDR_ANY).bigEndian) { buffer.copyMemory(from: $0) } |
| } |
| |
| let data = withUnsafeBytes(of: address) { Data($0) } |
| |
| self.init(protocolFamily: PF_INET, socketType: SOCK_STREAM, protocol: IPPROTO_TCP, address: data) |
| } |
| |
| private func createNonuniquedCore(from socket: CFSocket, protocolFamily family: Int32, socketType type: Int32, protocol: Int32) { |
| self.core = Core(isUniqued: false) |
| let address = CFSocketCopyAddress(socket)._swiftObject |
| core.signature = Signature(address: LocalAddress(address), protocolFamily: family, socketType: type, protocol: `protocol`) |
| core.receiver = socket |
| } |
| |
| public init?(protocolFamily family: Int32, socketType type: Int32, protocol: Int32, address: Data) { |
| super.init() |
| |
| var context = CFSocketContext() |
| context.info = Unmanaged.passUnretained(self).toOpaque() |
| var s: CFSocket |
| if type == SOCK_STREAM { |
| s = CFSocketCreate(nil, family, type, `protocol`, CFOptionFlags(kCFSocketAcceptCallBack), __NSFireSocketAccept, &context) |
| } else { |
| s = CFSocketCreate(nil, family, type, `protocol`, CFOptionFlags(kCFSocketDataCallBack), __NSFireSocketDatagram, &context) |
| } |
| |
| if CFSocketSetAddress(s, address._cfObject) != CFSocketError(0) { |
| return nil |
| } |
| |
| createNonuniquedCore(from: s, protocolFamily: family, socketType: type, protocol: `protocol`) |
| } |
| |
| public init?(protocolFamily family: Int32, socketType type: Int32, protocol: Int32, socket sock: SocketNativeHandle) { |
| super.init() |
| |
| var context = CFSocketContext() |
| context.info = Unmanaged.passUnretained(self).toOpaque() |
| var s: CFSocket |
| if type == SOCK_STREAM { |
| s = CFSocketCreateWithNative(nil, CFSocketNativeHandle(sock), CFOptionFlags(kCFSocketAcceptCallBack), __NSFireSocketAccept, &context) |
| } else { |
| s = CFSocketCreateWithNative(nil, CFSocketNativeHandle(sock), CFOptionFlags(kCFSocketDataCallBack), __NSFireSocketDatagram, &context) |
| } |
| |
| createNonuniquedCore(from: s, protocolFamily: family, socketType: type, protocol: `protocol`) |
| } |
| |
| public convenience init?(remoteWithTCPPort port: UInt16, host hostName: String?) { |
| let host = Host(name: hostName?.isEmpty == true ? nil : hostName) |
| var addresses: [String] = hostName == nil ? [] : [hostName!] |
| addresses.append(contentsOf: host.addresses) |
| |
| // Prefer IPv4 addresses, as Darwin does: |
| for address in addresses { |
| var inAddr = in_addr() |
| if inet_pton(AF_INET, address, &inAddr) == 1 { |
| var sinAddr = sockaddr_in(settingLength: ()) |
| sinAddr.sin_family = sa_family_t(AF_INET) |
| sinAddr.sin_port = port.bigEndian |
| sinAddr.sin_addr = inAddr |
| |
| let data = withUnsafeBytes(of: sinAddr) { Data($0) } |
| self.init(remoteWithProtocolFamily: PF_INET, socketType: SOCK_STREAM, protocol: IPPROTO_TCP, address: data) |
| return |
| } |
| } |
| for address in addresses { |
| var in6Addr = in6_addr() |
| if inet_pton(AF_INET6, address, &in6Addr) == 1 { |
| var sinAddr = sockaddr_in6(settingLength: ()) |
| sinAddr.sin6_family = sa_family_t(AF_INET6) |
| sinAddr.sin6_port = port.bigEndian |
| sinAddr.sin6_addr = in6Addr |
| |
| let data = withUnsafeBytes(of: sinAddr) { Data($0) } |
| self.init(remoteWithProtocolFamily: PF_INET, socketType: SOCK_STREAM, protocol: IPPROTO_TCP, address: data) |
| return |
| } |
| } |
| |
| if hostName != nil { |
| return nil |
| } |
| |
| // Lookup on local host. |
| var sinAddr = sockaddr_in(settingLength: ()) |
| sinAddr.sin_family = sa_family_t(AF_INET) |
| sinAddr.sin_port = port.bigEndian |
| withUnsafeMutableBytes(of: &sinAddr.sin_addr) { (buffer) in |
| withUnsafeBytes(of: in_addr_t(INADDR_LOOPBACK).bigEndian) { buffer.copyMemory(from: $0) } |
| } |
| let data = withUnsafeBytes(of: sinAddr) { Data($0) } |
| self.init(remoteWithProtocolFamily: PF_INET, socketType: SOCK_STREAM, protocol: IPPROTO_TCP, address: data) |
| } |
| |
| private static let remoteSocketCoresLock = NSLock() |
| private static var remoteSocketCores: [Signature: Core] = [:] |
| |
| static private func retainedCore(for signature: Signature) -> Core { |
| return SocketPort.remoteSocketCoresLock.synchronized { |
| if let core = SocketPort.remoteSocketCores[signature] { |
| return core |
| } else { |
| let core = Core(isUniqued: true) |
| core.signature = signature |
| |
| SocketPort.remoteSocketCores[signature] = core |
| |
| return core |
| } |
| } |
| } |
| |
| public init(remoteWithProtocolFamily family: Int32, socketType type: Int32, protocol: Int32, address: Data) { |
| let signature = Signature(address: LocalAddress(address), protocolFamily: family, socketType: type, protocol: `protocol`) |
| self.core = SocketPort.retainedCore(for: signature) |
| } |
| |
| private init(remoteWithSignature signature: Signature) { |
| self.core = SocketPort.retainedCore(for: signature) |
| } |
| |
| deinit { |
| // On Darwin, .invalidate() is invoked on the _last_ release, immediately before deinit; we cannot do that here. |
| invalidate() |
| } |
| |
| open override func invalidate() { |
| guard var core = core else { return } |
| self.core = nil |
| |
| var signatureToRemove: Signature? |
| |
| if core.isUniqued { |
| if isKnownUniquelyReferenced(&core) { |
| signatureToRemove = core.signature // No need to lock here — this is the only reference to the core. |
| } else { |
| return // Do not clean up the contents of this core yet. |
| } |
| } |
| |
| if let receiver = core.receiver, CFSocketIsValid(receiver) { |
| for connector in core.connectors.values { |
| CFSocketInvalidate(connector) |
| } |
| |
| CFSocketInvalidate(receiver) |
| |
| // Invalidation notifications are only sent for local (receiver != nil) ports. |
| NotificationCenter.default.post(name: Port.didBecomeInvalidNotification, object: self) |
| } |
| |
| if let signatureToRemove = signatureToRemove { |
| SocketPort.remoteSocketCoresLock.synchronized { |
| _ = SocketPort.remoteSocketCores.removeValue(forKey: signatureToRemove) |
| } |
| } |
| } |
| |
| open override var isValid: Bool { |
| return core != nil |
| } |
| |
| weak var _delegate: PortDelegate? |
| open override func setDelegate(_ anObject: PortDelegate?) { |
| _delegate = anObject |
| } |
| open override func delegate() -> PortDelegate? { |
| return _delegate |
| } |
| |
| open var protocolFamily: Int32 { |
| return core.signature.protocolFamily |
| } |
| |
| open var socketType: Int32 { |
| return core.signature.socketType |
| } |
| |
| open var `protocol`: Int32 { |
| return core.signature.protocol |
| } |
| |
| open var address: Data { |
| return core.signature.address.data |
| } |
| |
| open var socket: SocketNativeHandle { |
| return SocketNativeHandle(CFSocketGetNative(core.receiver)) |
| } |
| |
| open override func schedule(in runLoop: RunLoop, forMode mode: RunLoop.Mode) { |
| let loop = runLoop.getCFRunLoop() |
| let loopKey = ObjectIdentifier(loop) |
| |
| core.lock.synchronized { |
| guard let receiver = core.receiver, CFSocketIsValid(receiver) else { return } |
| |
| var modes = core.loops[loopKey]?.modes ?? [] |
| |
| guard !modes.contains(mode) else { return } |
| modes.insert(mode) |
| |
| core.loops[loopKey] = (loop, modes) |
| |
| if let source = CFSocketCreateRunLoopSource(nil, receiver, 600) { |
| CFRunLoopAddSource(loop, source, mode.rawValue._cfObject) |
| } |
| |
| for socket in core.connectors.values { |
| if let source = CFSocketCreateRunLoopSource(nil, socket, 600) { |
| CFRunLoopAddSource(loop, source, mode.rawValue._cfObject) |
| } |
| } |
| } |
| } |
| |
| open override func remove(from runLoop: RunLoop, forMode mode: RunLoop.Mode) { |
| let loop = runLoop.getCFRunLoop() |
| let loopKey = ObjectIdentifier(loop) |
| |
| core.lock.synchronized { |
| guard let receiver = core.receiver, CFSocketIsValid(receiver) else { return } |
| |
| let modes = core.loops[loopKey]?.modes ?? [] |
| guard modes.contains(mode) else { return } |
| |
| if modes.count == 1 { |
| core.loops.removeValue(forKey: loopKey) |
| } else { |
| core.loops[loopKey]?.modes.remove(mode) |
| } |
| |
| if let source = CFSocketCreateRunLoopSource(nil, receiver, 600) { |
| CFRunLoopRemoveSource(loop, source, mode.rawValue._cfObject) |
| } |
| |
| for socket in core.connectors.values { |
| if let source = CFSocketCreateRunLoopSource(nil, socket, 600) { |
| CFRunLoopRemoveSource(loop, source, mode.rawValue._cfObject) |
| } |
| } |
| } |
| } |
| |
| // On Darwin/ObjC Foundation, invoking initRemote… will return an existing SocketPort from a factory initializer if a port with that signature already exists. |
| // We cannot do that in Swift; we return instead different objects that share a 'core' (their state), so that invoking methods on either will affect the other. To keep maintain a better illusion, unlike Darwin, we redefine equality so that s1 == s2 for objects that have the same core, even though s1 !== s2 (this we cannot fix). |
| // This allows e.g. collections where SocketPorts are keys or entries to continue working the same way they do on Darwin when remote socket ports are encountered. |
| |
| open override var hash: Int { |
| var hasher = Hasher() |
| ObjectIdentifier(core).hash(into: &hasher) |
| return hasher.finalize() |
| } |
| |
| open override func isEqual(_ object: Any?) -> Bool { |
| guard let socketPort = object as? SocketPort else { return false } |
| return core === socketPort.core |
| } |
| |
| // Sending and receiving: |
| |
| fileprivate func socketDidAccept(_ socket: CFSocket?, _ type: CFSocketCallBackType, _ address: CFData?, _ data: UnsafeRawPointer?) { |
| guard let handle = data?.assumingMemoryBound(to: SocketNativeHandle.self), |
| let address = address else { |
| return |
| } |
| |
| var context = CFSocketContext() |
| context.info = Unmanaged.passUnretained(self).toOpaque() |
| |
| guard let child = CFSocketCreateWithNative(nil, CFSocketNativeHandle(handle.pointee), CFOptionFlags(kCFSocketDataCallBack), __NSFireSocketData, &context) else { |
| return |
| } |
| |
| var signature = core.signature! |
| signature.address = LocalAddress(address._swiftObject) |
| core.lock.synchronized { |
| core.connectors[signature] = child |
| addToLoopsAssumingLockHeld(child) |
| } |
| } |
| |
| private func addToLoopsAssumingLockHeld(_ socket: CFSocket) { |
| guard let source = CFSocketCreateRunLoopSource(nil, socket, 600) else { |
| return |
| } |
| |
| for loop in core.loops.values { |
| for mode in loop.modes { |
| CFRunLoopAddSource(loop.runLoop, source, mode.rawValue._cfObject) |
| } |
| } |
| } |
| |
| private static let magicNumber: UInt32 = 0xD0CF50C0 |
| |
| private enum ComponentType: UInt32 { |
| case data = 1 |
| case port = 2 |
| } |
| |
| fileprivate func socketDidReceiveData(_ socket: CFSocket?, _ type: CFSocketCallBackType, _ address: CFData?, _ dataPointer: UnsafeRawPointer?) { |
| guard let socket = socket, |
| let dataPointer = dataPointer else { return } |
| let socketKey = ObjectIdentifier(socket) |
| |
| let peerAddress = CFSocketCopyPeerAddress(socket)._swiftObject |
| |
| let lock = core.lock |
| lock.lock() // We may briefly release the lock during the loop below, and it's unlocked at the end ⬇ |
| |
| let data = Unmanaged<CFData>.fromOpaque(dataPointer).takeUnretainedValue()._swiftObject |
| |
| if data.count == 0 { |
| core.data.removeValue(forKey: socketKey) |
| let keysToRemove = core.connectors.keys.filter { core.connectors[$0] === socket } |
| for key in keysToRemove { |
| core.connectors.removeValue(forKey: key) |
| } |
| } else { |
| var storedData: Data |
| if let currentData = core.data[socketKey] { |
| storedData = currentData |
| storedData.append(contentsOf: data) |
| } else { |
| storedData = data |
| } |
| |
| var keepGoing = true |
| while keepGoing && storedData.count > 8 { |
| let preamble = storedData.withUnsafeBytes { $0.load(fromByteOffset: 0, as: UInt32.self) }.bigEndian |
| let messageLength = storedData.withUnsafeBytes { $0.load(fromByteOffset: 4, as: UInt32.self) }.bigEndian |
| |
| if preamble == SocketPort.magicNumber && messageLength > 8 { |
| if storedData.count >= messageLength { |
| let messageEndIndex = storedData.index(storedData.startIndex, offsetBy: Int(messageLength)) |
| |
| let toStore = storedData[offset: messageEndIndex...] |
| if toStore.isEmpty { |
| core.data.removeValue(forKey: socketKey) |
| } else { |
| core.data[socketKey] = toStore |
| } |
| storedData.removeSubrange(messageEndIndex...) |
| |
| lock.unlock() // Briefly release the lock ⬆ |
| handleMessage(storedData, from: peerAddress, socket: socket) |
| lock.lock() // Retake for the remainder ⬇ |
| |
| if let newStoredData = core.data[socketKey] { |
| storedData = newStoredData |
| } else { |
| keepGoing = false |
| } |
| } else { |
| keepGoing = false |
| } |
| } else { |
| // Got message without proper header; delete it. |
| core.data.removeValue(forKey: socketKey) |
| keepGoing = false |
| } |
| } |
| } |
| |
| lock.unlock() // Release lock from above ⬆ |
| } |
| |
| fileprivate func socketDidReceiveDatagram(_ socket: CFSocket?, _ type: CFSocketCallBackType, _ address: CFData?, _ data: UnsafeRawPointer?) { |
| guard let address = address?._swiftObject, |
| let data = data else { |
| return |
| } |
| |
| let actualData = Unmanaged<CFData>.fromOpaque(data).takeUnretainedValue()._swiftObject |
| self.handleMessage(actualData, from: address, socket: nil) |
| } |
| |
| private enum Structure { |
| static let offsetOfMagicNumber = 0 // a UInt32 |
| static let offsetOfMessageLength = 4 // a UInt32 |
| static let offsetOfMsgId = 8 // a UInt32 |
| |
| static let offsetOfSignature = 12 |
| static let sizeOfSignatureHeader = 4 |
| static let offsetOfSignatureAddressLength = 15 |
| } |
| |
| private func handleMessage(_ message: Data, from address: Data, socket: CFSocket?) { |
| guard message.count > 24, let delegate = delegate() else { return } |
| let portMessage = message.withUnsafeBytes { (messageBuffer) -> PortMessage? in |
| guard SocketPort.magicNumber == messageBuffer.load(fromByteOffset: Structure.offsetOfMagicNumber, as: UInt32.self).bigEndian, |
| message.count == messageBuffer.load(fromByteOffset: Structure.offsetOfMessageLength, as: UInt32.self).bigEndian else { |
| return nil |
| } |
| |
| let msgId = messageBuffer.load(fromByteOffset: Structure.offsetOfMsgId, as: UInt32.self).bigEndian |
| let signatureLength = Int(messageBuffer[Structure.offsetOfSignatureAddressLength]) + Structure.sizeOfSignatureHeader // corresponds to the sin_len/sin6_len field inside the signature. |
| var signatureBytes = Data(UnsafeRawBufferPointer(rebasing: messageBuffer[12 ..< min(12 + signatureLength, message.count - 20)])) |
| |
| if signatureBytes.count >= 12 && signatureBytes[offset: 5] == AF_INET && address.count >= 8 { |
| if address[offset: 1] == AF_INET { |
| signatureBytes.withUnsafeMutableBytes { (mutableSignatureBuffer) in |
| address.withUnsafeBytes { (address) in |
| mutableSignatureBuffer.baseAddress?.advanced(by: 8).copyMemory(from: address.baseAddress!.advanced(by: 4), byteCount: 4) |
| } |
| } |
| } |
| } else if signatureBytes.count >= 32 && signatureBytes[offset: 5] == AF_INET6 && address.count >= 28 { |
| if address[offset: 1] == AF_INET6 { |
| signatureBytes.withUnsafeMutableBytes { (mutableSignatureBuffer) in |
| address.withUnsafeBytes { (address) in |
| mutableSignatureBuffer.baseAddress?.advanced(by: 8).copyMemory(from: address.baseAddress!.advanced(by: 4), byteCount: 24) |
| } |
| } |
| } |
| } |
| |
| guard let signature = Signature(darwinCompatibleDataRepresentation: signatureBytes) else { |
| return nil |
| } |
| |
| if let socket = socket { |
| core.lock.synchronized { |
| if let existing = core.connectors[signature], CFSocketIsValid(existing) { |
| core.connectors[signature] = socket |
| } |
| } |
| } |
| |
| let sender = SocketPort(remoteWithSignature: signature) |
| var index = messageBuffer.startIndex + signatureBytes.count + 20 |
| var components: [AnyObject] = [] |
| while index < messageBuffer.endIndex { |
| var word1: UInt32 = 0 |
| var word2: UInt32 = 0 |
| |
| // The amount of data prior to this point may mean that these reads are unaligned; copy instead. |
| withUnsafeMutableBytes(of: &word1) { (destination) -> Void in |
| messageBuffer.copyBytes(to: destination, from: Int(index - 8) ..< Int(index - 4)) |
| } |
| withUnsafeMutableBytes(of: &word2) { (destination) -> Void in |
| messageBuffer.copyBytes(to: destination, from: Int(index - 4) ..< Int(index)) |
| } |
| |
| let componentType = word1.bigEndian |
| let componentLength = word2.bigEndian |
| |
| let componentEndIndex = min(index + Int(componentLength), messageBuffer.endIndex) |
| let componentData = Data(messageBuffer[index ..< componentEndIndex]) |
| |
| if let type = ComponentType(rawValue: componentType) { |
| switch type { |
| case .data: |
| components.append(componentData._nsObject) |
| |
| case .port: |
| if let signature = Signature(darwinCompatibleDataRepresentation: componentData) { |
| components.append(SocketPort(remoteWithSignature: signature)) |
| } |
| } |
| } |
| |
| guard messageBuffer.formIndex(&index, offsetBy: componentData.count + 8, limitedBy: messageBuffer.endIndex) else { |
| break |
| } |
| } |
| |
| let message = PortMessage(sendPort: sender, receivePort: self, components: components) |
| message.msgid = msgId |
| return message |
| } |
| |
| if let portMessage = portMessage { |
| delegate.handle(portMessage) |
| } |
| } |
| |
| open override func sendBeforeDate(_ limitDate: Date, components: NSMutableArray?, from receivePort: Port?, reserved headerSpaceReserved: Int) -> Bool { |
| sendBeforeDate(limitDate, msgid: 0, components: components, from: receivePort, reserved: headerSpaceReserved) |
| } |
| |
| open override func sendBeforeDate(_ limitDate: Date, msgid msgID: Int, components: NSMutableArray?, from receivePort: Port?, reserved headerSpaceReserved: Int) -> Bool { |
| guard let sender = sendingSocket(for: self, before: limitDate.timeIntervalSinceReferenceDate), |
| let signature = core.signature.darwinCompatibleDataRepresentation else { |
| return false |
| } |
| |
| let magicNumber: UInt32 = SocketPort.magicNumber.bigEndian |
| var outLength: UInt32 = 0 |
| let messageNumber = UInt32(msgID).bigEndian |
| |
| var data = Data() |
| withUnsafeBytes(of: magicNumber) { data.append(contentsOf: $0) } |
| withUnsafeBytes(of: outLength) { data.append(contentsOf: $0) } // Reserve space for it. |
| withUnsafeBytes(of: messageNumber) { data.append(contentsOf: $0) } |
| data.append(contentsOf: signature) |
| |
| for component in components?.allObjects ?? [] { |
| var componentData: Data |
| var componentType: ComponentType |
| |
| switch component { |
| case let component as Data: |
| componentData = component |
| componentType = .data |
| |
| case let component as NSData: |
| componentData = component._swiftObject |
| componentType = .data |
| |
| case let component as SocketPort: |
| guard let signature = component.core.signature.darwinCompatibleDataRepresentation else { |
| return false |
| } |
| componentData = signature |
| componentType = .port |
| |
| default: |
| return false |
| } |
| |
| var componentLength = UInt32(componentData.count).bigEndian |
| var componentTypeRawValue = UInt32(componentType.rawValue).bigEndian |
| |
| withUnsafeBytes(of: &componentTypeRawValue) { data.append(contentsOf: $0) } |
| withUnsafeBytes(of: &componentLength) { data.append(contentsOf: $0) } |
| data.append(contentsOf: componentData) |
| } |
| |
| outLength = UInt32(data.count).bigEndian |
| withUnsafeBytes(of: outLength) { (lengthBytes) -> Void in |
| data.withUnsafeMutableBytes { (bytes) -> Void in |
| bytes.baseAddress!.advanced(by: 4).copyMemory(from: lengthBytes.baseAddress!, byteCount: lengthBytes.count) |
| } |
| } |
| |
| let result = CFSocketSendData(sender, self.address._cfObject, data._cfObject, limitDate.timeIntervalSinceNow) |
| |
| switch result { |
| case kCFSocketSuccess: |
| return true |
| case kCFSocketError: fallthrough |
| case kCFSocketTimeout: |
| return false |
| |
| default: |
| fatalError("Unknown result of sending through a socket: \(result)") |
| } |
| } |
| |
| private static let maximumTimeout: TimeInterval = 86400 |
| |
| private static let sendingSocketsLock = NSLock() |
| private static var sendingSockets: [SocketKind: CFSocket] = [:] |
| |
| private func sendingSocket(for port: SocketPort, before time: TimeInterval) -> CFSocket? { |
| let signature = port.core.signature! |
| let socketKind = signature.socketKind |
| |
| var context = CFSocketContext() |
| context.info = Unmanaged.passUnretained(self).toOpaque() |
| |
| return core.lock.synchronized { |
| if let connector = core.connectors[signature], CFSocketIsValid(connector) { |
| return connector |
| } else { |
| if signature.socketType == SOCK_STREAM { |
| if let connector = CFSocketCreate(nil, socketKind.protocolFamily, socketKind.socketType, socketKind.protocol, CFOptionFlags(kCFSocketDataCallBack), __NSFireSocketData, &context), CFSocketIsValid(connector) { |
| var timeout = time - Date.timeIntervalSinceReferenceDate |
| if timeout < 0 || timeout >= SocketPort.maximumTimeout { |
| timeout = 0 |
| } |
| |
| if CFSocketIsValid(connector) && CFSocketConnectToAddress(connector, address._cfObject, timeout) == CFSocketError(0) { |
| core.connectors[signature] = connector |
| self.addToLoopsAssumingLockHeld(connector) |
| return connector |
| } else { |
| CFSocketInvalidate(connector) |
| } |
| } |
| } else { |
| return SocketPort.sendingSocketsLock.synchronized { |
| var result: CFSocket? |
| |
| if signature.socketKind == core.signature.socketKind, |
| let receiver = core.receiver, CFSocketIsValid(receiver) { |
| result = receiver |
| } else if let socket = SocketPort.sendingSockets[socketKind], CFSocketIsValid(socket) { |
| result = socket |
| } |
| |
| if result == nil, |
| let sender = CFSocketCreate(nil, socketKind.protocolFamily, socketKind.socketType, socketKind.protocol, CFOptionFlags(kCFSocketNoCallBack), nil, &context), CFSocketIsValid(sender) { |
| |
| SocketPort.sendingSockets[socketKind] = sender |
| result = sender |
| } |
| |
| return result |
| } |
| } |
| } |
| |
| return nil |
| } |
| } |
| } |
| |
| fileprivate extension Data { |
| subscript(offset value: Int) -> Data.Element { |
| return self[self.index(self.startIndex, offsetBy: value)] |
| } |
| |
| subscript(offset range: Range<Int>) -> Data.SubSequence { |
| return self[self.index(self.startIndex, offsetBy: range.lowerBound) ..< self.index(self.startIndex, offsetBy: range.upperBound)] |
| } |
| |
| subscript(offset range: ClosedRange<Int>) -> Data.SubSequence { |
| return self[self.index(self.startIndex, offsetBy: range.lowerBound) ... self.index(self.startIndex, offsetBy: range.upperBound)] |
| } |
| |
| subscript(offset range: PartialRangeFrom<Int>) -> Data.SubSequence { |
| return self[self.index(self.startIndex, offsetBy: range.lowerBound)...] |
| } |
| |
| subscript(offset range: PartialRangeUpTo<Int>) -> Data.SubSequence { |
| return self[...self.index(self.startIndex, offsetBy: range.upperBound)] |
| } |
| } |