blob: 94f27b5babe9eb1cd538e9af4f862e2a52761b44 [file] [log] [blame]
//===-- CompilerRuntime.swift ---------------------------------*- swift -*-===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2017 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//
//
// This file defines the Swift runtime support for TensorFlow computation.
//
// TODO:
// - Support async on platforms other than Linux and FreeBSD.
// - Revisit the concurrency model and see if Dispatch can be built without
// Foundation.
// - Detach compiler runtime from the TensorFlow standard library to a separate
// TensorFlowRuntime module.
//
// NOTE:
// - Much code is intentionally un-Swifty because TF/TFE support is likely to
// change. Variable pairs with versions for TF/TFE would be better represented
// as an enum, but since the TFE runtime is soon to be removed it doesn't make
// sense temporarily change the code.
// - Code should be made Swifty after support is stabilized and churn rate is
// lower (e.g. variable pairs should be rewritten as an enums).
//
//===----------------------------------------------------------------------===//
#if os(macOS) || os(iOS) || os(watchOS) || os(tvOS)
import Darwin
#else
import Glibc
#endif
import CTensorFlow
// If `serverAddress` is nil, use local session (good for forge testing).
//
// FIXME: We need transparent here because deabstraction isn't inlining this
// function. We need to inline if a callee contains tensor ops, not only if
// it takes and returns a TensorFlow value.
@_transparent
public func enableTPU(serverAddress: String? = nil, infeed: Bool = true) {
_RuntimeConfig.executionMode = .tpu
if let serverAddress = serverAddress {
_RuntimeConfig.session = .remote(grpcAddress: serverAddress)
}
#tfop("tfc.configureTPU", enableInfeed: infeed) as Void
}
// FIXME: Extend the interface to support multiple GPU devices, and unify it
// with enableTPU() above.
@_transparent
public func enableGPU() {
_RuntimeConfig.executionMode = .gpu
#tfop("tfc.configureGPU") as Void
}
@_fixed_layout
public enum _ExecutionMode : Equatable {
/// Classical TF interpreter backend, on CPU.
case cpu
/// Classical TF interpreter backend, on GPU.
case gpu
/// TPU backend.
case tpu
/// XLA jit-compilation backend (will use GPU when available, and otherwise
/// CPU).
case xla
public var isTPU: Bool {
switch self {
case .tpu: return true
default: return false
}
}
}
/// The configuration for the compiler runtime.
// TODO(hongm): Revisit the longer-term design.
@_fixed_layout
public enum _RuntimeConfig {
/// When true, run the entire tensor computation in
/// _TFCStartTensorComputation(), instead of running it on a separate thread.
/// - Note: Set to true only for debugging purposes.
static public var usesSynchronousExecution = false
/// Setting the value to gpu requires that the Swift compiler and model code
/// be built with `--config=cuda`. If there are multiple GPUs, an arbitrary
/// one is chosen.
static public var executionMode: _ExecutionMode = .cpu {
willSet {
debugLog("About to set executionMode to \(newValue)")
guard newValue == .gpu else { return }
guard _ExecutionContext.global.gpuDeviceName != nil else {
fatalError("""
GPU must be available when _RuntimeConfig.executionMode is set to \
.gpu -- did you compile with --config=cuda and have a qualified GPU?
""")
}
}
}
/// Specifies whether the TensorFlow computation runs in a local (in-process)
/// session, or a remote session with the specified server address (must start
/// with "grpc://" in that case).
@_fixed_layout
public enum RuntimeSession {
case local
case remote(grpcAddress: String)
}
static public var session: RuntimeSession = .local
/// When true, prints various debug messages on the runtime state.
///
/// If the value is true when running tensor computation for the first time in
/// the process, INFO log from TensorFlow will also get printed.
static public var printsDebugLog = false
/// Specifies the verbose log level in TensorFlow; a higher level prints out
/// more log. Only meaningful when `printsDebugLog` is true, and must be
/// within [0, 4] in that case.
static public var tensorflowVerboseLogLevel: Int32 = 0 {
willSet {
debugLog("About to set tensorflowVerboseLogLevel to \(newValue)")
guard newValue >= 0 && newValue <= 4 else {
fatalError("Invalid tensorflowVerboseLogLevel value \(newValue)")
}
}
}
}
private func configureRuntimeFromEnvironment() {
if let value = getenv("SWIFT_TENSORFLOW_ENABLE_DEBUG_LOGGING"),
String(cString: value).lowercased() == "true" {
_RuntimeConfig.printsDebugLog = true
debugLog("Turning on debug logging from env.")
}
if let value = getenv("SWIFT_TENSORFLOW_VERBOSE_LOG_LEVEL") {
guard var verboseLevel = Int32(String(cString: value)) else {
fatalError("SWIFT_TENSORFLOW_VERBOSE_LOG_LEVEL must take an int value.")
}
if verboseLevel > 4 {
verboseLevel = 4
}
_RuntimeConfig.tensorflowVerboseLogLevel = verboseLevel
debugLog("Setting TF logging verbose level to \(verboseLevel) from env.")
}
if let value = getenv("SWIFT_TENSORFLOW_USE_TPU_INFEED"),
String(cString: value).lowercased() == "true" {
_RuntimeConfig.executionMode = .tpu
debugLog("Setting TPU execution with infeed from env.")
}
if let value = getenv("SWIFT_TENSORFLOW_SYNC_EXECUTION"),
String(cString: value).lowercased() == "true" {
_RuntimeConfig.usesSynchronousExecution = true
debugLog("Using sync execution from env.")
}
if let value = getenv("SWIFT_TENSORFLOW_SERVER_ADDRESS") {
let address = String(cString: value)
debugLog("env var SWIFT_TENSORFLOW_SERVER_ADDRESS has value \(address).")
if address == "local" {
_RuntimeConfig.session = .local
debugLog("Using local TF session.")
return
}
guard address.prefix(7) == "grpc://" else {
fatalError("SWIFT_TENSORFLOW_SERVER_ADDRESS must start with 'grpc://'.")
}
_RuntimeConfig.session = .remote(grpcAddress: address)
debugLog("Setting TF server address to \(address) from env.")
}
}
/// Initialize the TPU system.
/// - Note: This should be called only once.
/// - Precondition: The given session must contain the given graph.
// TODO(b/77572335): Reassess how to reset TPU after execution error.
private func initializeTPU(withSession session: CTFSession, graph: CTFGraph,
status: CTFStatus) {
let configOp = TF_GraphOperationByName(graph, "ConfigureDistributedTPU")
internalConsistencyCheck(configOp != nil)
var configNode = TF_Output(oper: configOp, index: 0)
var dummyOutput: CTensor?
TF_SessionRun(session, nil, nil, nil, 0, &configNode, &dummyOutput, 1, nil,
0, nil, status)
checkOk(status)
TF_DeleteTensor(dummyOutput)
}
/// The host of any tensor computation.
@_fixed_layout
public final class _ExecutionContext {
/// Global context storing all available devices, loaded functions, etc.
public static let global: _ExecutionContext = _ExecutionContext()
public let cpuDeviceName: String
/// Only set when there is a usable GPU.
public let gpuDeviceName: String?
/// The TFE_Context object.
private var cContext: CTFEContext
// NOTE: the following properties are intentionally not implemented as an enum
// due to high churn, *please do not refactor for Swiftiness*.
internal typealias ProgramInstance = (graph: CTFGraph, session: CTFSession)
private var loadedTFPrograms: [UnsafeRawPointer : ProgramInstance] = [:]
/// The status for checking TensorFlow errors.
private let status: CTFStatus = TF_NewStatus()
/// The mutex for preventing potential concurrent access.
private var mutex: pthread_mutex_t = pthread_mutex_t()
/// Initializes a new execution context by initializing available devices.
@_versioned
init() {
configureRuntimeFromEnvironment()
debugLog("Initializing global context.")
// Initialize the TF runtime exactly once. Only affects local execution
// (when _RuntimeConfig.tensorFlowServer is set to "").
InitTensorFlowRuntime(_RuntimeConfig.printsDebugLog ? 1 : 0,
_RuntimeConfig.tensorflowVerboseLogLevel)
guard let opts = TFE_NewContextOptions() else {
fatalError("ContextOptions object can never be nil.")
}
let ctx = TFE_NewContext(opts, status)
checkOk(status)
self.cContext = ctx!
TFE_DeleteContextOptions(opts)
checkOk(status)
// Initialize GPU device.
// While the code here is only needed when _RuntimeConfig.executionMode is
// set to .gpu, running it in all code paths helps keep things simple
// (e.g. so that the cpuDeviceName property is always set.)
let devices = TFE_ContextListDevices(cContext, status)
checkOk(status)
defer { TF_DeleteDeviceList(devices!) }
let deviceCount = TF_DeviceListCount(devices!)
debugLog("There are \(deviceCount) devices.")
var deviceNames: [String : String] = [:]
for deviceId in 0..<deviceCount {
let cDeviceName = TF_DeviceListName(devices, deviceId, status)
checkOk(status)
let deviceName = String(cString: cDeviceName!)
let cDeviceType = TF_DeviceListType(devices, deviceId, status)
checkOk(status)
let deviceType = String(cString: cDeviceType!)
debugLog(
"Device \(deviceId) has type \(deviceType) and name \(deviceName)."
)
deviceNames[deviceType] = deviceName
}
guard let cpuDeviceName = deviceNames["CPU"] else {
fatalError("CPU should always be an available device.")
}
self.cpuDeviceName = cpuDeviceName
// This must be non-nil when _RuntimeConfig.executionMode is set to .gpu, as
// being enforced in the willSet check for that property.
self.gpuDeviceName = deviceNames["GPU"]
// Initialize the mutex.
pthread_mutex_init(&mutex, nil)
}
deinit {
debugLog("De-initializing global context.")
// Delete all loaded programs.
for (graph, session) in loadedTFPrograms.values {
TF_DeleteSession(session, status)
checkOk(status)
TF_DeleteGraph(graph)
}
TFE_DeleteContext(cContext, status)
checkOk(status)
TF_DeleteStatus(status)
pthread_mutex_destroy(&mutex)
}
}
internal extension _ExecutionContext {
/// Synchronously execute the body, preventing asynchronous computation from
/// corrupting the context data.
private func sync<Result>(
execute body: () throws -> Result
) rethrows -> Result {
let lockStatus = pthread_mutex_lock(&mutex)
internalConsistencyCheck(lockStatus == 0)
defer {
let unlockStatus = pthread_mutex_unlock(&mutex)
internalConsistencyCheck(unlockStatus == 0)
// Create a cancellation point.
pthread_testcancel()
}
return try body()
}
/// Invokes the given closure with the underlying C context. Access to the C
/// context is guaranteed to be thread-safe within the closure.
func withMutableCContext<Result>(
execute body: (CTFEContext) throws -> Result
) rethrows -> Result {
return try sync {
try body(cContext)
}
}
}
fileprivate extension _ExecutionContext {
/// Returns a cached session along with its graph if it exists, or creates a
/// new one and caches it.
func session(
forProgram programByteAddress: UnsafeRawPointer,
programByteCount: Int
) -> ProgramInstance {
return sync {
// If a program instance for this program is already cached, use that.
if let instance = loadedTFPrograms[programByteAddress] {
return instance
}
// Otherwise, load the graph, create a session, and cache them.
debugLog("Loading a tensor program as a TF graph.")
let newGraph = TF_NewGraph()!
// TensorFlow loads things through TF_Buffer. Create one that avoids
// redundantly copying the program bytes.
var programBuf = TF_Buffer(data: programByteAddress,
length: programByteCount,
data_deallocator: nil)
let graphDefOptions = TF_NewImportGraphDefOptions()
TF_GraphImportGraphDef(newGraph, &programBuf, graphDefOptions, status)
TF_DeleteImportGraphDefOptions(graphDefOptions)
checkOk(status)
debugLog("Done loading a new program.")
// Prepare session options for initializing a session.
let sessionOptions = TF_NewSessionOptions()
// If needed, enable XLA compilation in session options.
if _RuntimeConfig.executionMode == .xla {
debugLog("Enable XLA execution.")
TF_EnableXLACompilation(sessionOptions, 1)
}
// If needed, enable remote execution in session options.
if case .remote(let grpcAddress) = _RuntimeConfig.session {
debugLog("Set TensorFlow server to \(grpcAddress).")
TF_SetTarget(sessionOptions, grpcAddress)
}
// Create a new session using the session options above.
let maybeNewSession = TF_NewSession(newGraph, sessionOptions, status)
checkOk(status)
let newSession = maybeNewSession!
TF_DeleteSessionOptions(sessionOptions)
// If this is the first session in TPU mode, make sure TPU is
// reset/initialized.
if loadedTFPrograms.isEmpty, _RuntimeConfig.executionMode.isTPU {
initializeTPU(withSession: newSession, graph: newGraph, status: status)
}
// Cache the session.
let newInstance = (graph: newGraph, session: newSession)
loadedTFPrograms[programByteAddress] = newInstance
return newInstance
}
}
}
@_versioned
internal func dumpTensorContent<Scalar : AccelerableByTensorFlow>(
_ inputTensor: CTensorHandle, _: Scalar.Type
) {
let array = ShapedArray<Scalar>(cTensorHandle: inputTensor)
debugLog("Rank is \(array.rank), shape is \(array.shape).")
debugLog("""
The content of the \(array.scalars.count) scalars are: \
\(array.scalars).
""")
}
@_versioned
internal func dumpCTensorHandleContent(
_ idx: Int,
_ inputTensorHandle: CTensorHandle) {
let dType: TF_DataType = TFE_TensorHandleDataType(inputTensorHandle)
debugLog("Tensor \(idx) has TF data type \(dType).")
switch dType {
case TF_INT8: dumpTensorContent(inputTensorHandle, Int8.self)
case TF_UINT8: dumpTensorContent(inputTensorHandle, UInt8.self)
case TF_INT16: dumpTensorContent(inputTensorHandle, Int16.self)
case TF_UINT16: dumpTensorContent(inputTensorHandle, UInt16.self)
case TF_INT16: dumpTensorContent(inputTensorHandle, Int16.self)
case TF_UINT32: dumpTensorContent(inputTensorHandle, UInt32.self)
case TF_INT32: dumpTensorContent(inputTensorHandle, Int32.self)
case TF_UINT64: dumpTensorContent(inputTensorHandle, UInt64.self)
case TF_INT64: dumpTensorContent(inputTensorHandle, Int64.self)
case TF_FLOAT: dumpTensorContent(inputTensorHandle, Float.self)
case TF_DOUBLE: dumpTensorContent(inputTensorHandle, Double.self)
case TF_BOOL: dumpTensorContent(inputTensorHandle, Bool.self)
default: fatalError("Unsupported dtype \(dType)")
}
}
private class TFState {
let status: CTFStatus = TF_NewStatus()
/// The TF_Session to execute the function.
fileprivate let cSession: CTFSession
/// The graph that contains the function to execute. Not owned.
let graph: CTFGraph
/// The input tensors.
var inputTensors: [CTensor?] = []
init(_ programByteAddress: UnsafeRawPointer, programByteCount: Int) {
let context = _ExecutionContext.global
(graph, cSession) = context.session(forProgram: programByteAddress,
programByteCount: programByteCount)
}
deinit {
TF_DeleteStatus(status)
}
}
extension TFState {
func addInput(_ inputTensorHandle: CTensorHandle) {
// We assume the input tensors live in host memory.
let cTensor = TFE_TensorHandleResolve(inputTensorHandle, status)
checkOk(status)
inputTensors.append(cTensor!)
}
/// See the comment on _TensorComputation.helperFunctionCount on the concept
/// of a "helper function".
func execute(_ entryFunctionBaseName: String,
helperFunctionCount: Int,
returnValues: inout [CTensorHandle?]) {
let funcNodeName = "tfc_func_" + entryFunctionBaseName
let funcNode = TF_GraphOperationByName(graph, funcNodeName)
internalConsistencyCheck(
funcNode != nil,
"Cannot find func node name \(funcNodeName)"
)
internalConsistencyCheck(
TF_OperationNumOutputs(funcNode) == returnValues.count
)
// Prepare input related parameters for TF_SessionRun().
var inputNodeSpecs: [TF_Output] = []
for i in 0..<inputTensors.count {
let inputNodeName = "tfc_input_\(i)_\(entryFunctionBaseName)"
let inputNode = TF_GraphOperationByName(graph, inputNodeName)
internalConsistencyCheck(inputNode != nil,
"Cannot find input node name \(inputNodeName)")
inputNodeSpecs.append(TF_Output(oper: inputNode, index: 0))
}
// Prepare output related parameters for TF_SessionRun().
var outputNodeSpecs: [TF_Output] = []
for i in 0..<returnValues.count {
let outputNodeName = "tfc_output_\(i)_\(entryFunctionBaseName)"
let outputNode = TF_GraphOperationByName(graph, outputNodeName)
internalConsistencyCheck(outputNode != nil,
"Cannot find output node name \(outputNodeName)")
outputNodeSpecs.append(TF_Output(oper: outputNode, index: 0))
}
var outputTensors: [CTensor?] = Array(repeating: nil,
count: returnValues.count)
// Prepare target related parameters for TF_SessionRun().
// A vector of TF_Operation* objects.
var targetNodeSpecs: [OpaquePointer?] = []
for i in 0..<helperFunctionCount {
let helperFuncNodeName = "tfc_func_\(entryFunctionBaseName)_helper_\(i)"
let helperFuncNode = TF_GraphOperationByName(graph, helperFuncNodeName)
guard helperFuncNode != nil else {
fatalError("Cannot find helper func node name \(helperFuncNodeName)")
}
targetNodeSpecs.append(helperFuncNode)
}
if returnValues.count > 0 {
if _RuntimeConfig.executionMode.isTPU {
debugLog("Enable TPU execution.")
// When infeed is enabled, run it along with the output tensor nodes
// below.
let infeedEnqueueNode = TF_GraphOperationByName(graph,
"InfeedEnqueueTuple")
if let infeedEnqueueNode = infeedEnqueueNode {
targetNodeSpecs.append(infeedEnqueueNode)
debugLog("Running enqueue with \(inputTensors.count) input tensors.")
}
}
debugLog("""
Calling TF_SessionRun on function \(entryFunctionBaseName), With \
\(targetNodeSpecs.count) target nodes.
""")
TF_SessionRun(
cSession, nil,
// input related parameters
inputNodeSpecs, inputTensors, Int32(inputTensors.count),
// output related parameters
outputNodeSpecs, &outputTensors, Int32(returnValues.count),
// target related parameters
targetNodeSpecs, Int32(targetNodeSpecs.count),
/*run_metadata*/nil, status
)
checkOk(status)
debugLog("Done running TF computation.")
} else {
// TF_SessionRun() does not support execution that involves no
// outputs. In this case we assume the TF execution is side-effect free,
// so skipping is a valid optimization without changing behavior.
//
// This case usually only occurs in compiler-only unit tests, where the
// generated TF program does not produce any outputs to be consumed by
// Swift host code.
debugLog("Skipping calling TF_SessionRun since there are no outputs.")
}
// Delete input tensors.
for inputTensor in inputTensors {
TF_DeleteTensor(inputTensor)
}
// Synthesize TFE tensor handles to work with the existing Swift TF
// library code.
for i in 0..<returnValues.count {
assert(outputTensors[i] != nil)
returnValues[i] = TFE_NewTensorHandle(outputTensors[i], status)
checkOk(status)
TF_DeleteTensor(outputTensors[i])
if _RuntimeConfig.printsDebugLog {
dumpCTensorHandleContent(i, returnValues[i]!)
}
}
}
}
//===----------------------------------------------------------------------===//
// - MARK: Tensor computation
//===----------------------------------------------------------------------===//
/// Tensor computation.
///
/// - Note: The call sequence for the APIs below must be one of the two:
/// `init -> terminate()` or `init -> finish()`.
/// The finish/terminate APIs may only be called once.
@_fixed_layout
public final class _TensorComputation {
/// The status for checking TensorFlow errors.
let status: CTFStatus = TF_NewStatus()
/// The base name for the set of graph functions to execute in this instance.
let entryFunctionBaseName: String
/// The number of helper functions associated with this TF graph
/// execution. For a graph execution over N TF devices, there is a single
/// *primary* graph function, responsible for taking input and producing
/// output tensors as part of the SessionRun() spec. The other N-1 graph
/// functions are *helper* functions, executed together with the primary one
/// for their side-effects, such as sending/receiving tensors, "helping" the
/// primary function produce the desired output tensors and side effects as
/// needed by the SessionRun() call.
let helperFunctionCount: Int
/// The tensor handles returned by the tensor program.
// TODO(hongm): Retire returnValues when eager based runtime is removed.
var returnValues: [CTensorHandle?]
// NOTE: the following properties are intentionally not implemented as an enum
// due to high churn, *please do not refactor for Swiftiness*.
private var stateTF: TFState?
/// The thread to run tensor computation in. The global config flag
/// '_RuntimeConfig.usesSynchronousExecution' decides whether tensor
/// computation should be synchronous: if true, this property will always be
/// nil.
private var pthread: pthread_t?
/// Loads the TF program from a binary TF FunctionDef proto given by
/// 'programByteAddress' and 'programByteCount', and start the computation.
///
/// - Parameters:
/// - programByteAddress: The address of the raw program.
/// - programByteCount: The number of bytes in the program.
/// - entryFunctionBaseNameAddress: The base name of the functions to run.
/// - tensorArgumentAddress: The address to the buffer containing tensor
/// arguments as CTensorHandle.
/// - tensorArgumentCount: The number of tensor arguments to pass in.
/// - helperFunctionCount: The number of helper functions to run.
/// - resultCount: The number of output tensors.
///
/// - TODO(clattner): `resultCount` should go away when the runtime is
/// implemented with an async design.
@_versioned
init(programByteAddress: UnsafeRawPointer,
programByteCount: Int,
entryFunctionBaseNameAddress: UnsafePointer<Int8>,
tensorArgumentAddress: UnsafePointer<CTensorHandle>,
tensorArgumentCount: Int,
helperFunctionCount: Int,
resultCount: Int) {
configureRuntimeFromEnvironment()
let inputTensorHandles = UnsafeBufferPointer(start: tensorArgumentAddress,
count: tensorArgumentCount)
// Get global execution context, which caches all our tensor programs.
let context = _ExecutionContext.global
if _RuntimeConfig.printsDebugLog {
let buffer = UnsafeBufferPointer(
start: programByteAddress.assumingMemoryBound(to: UInt8.self),
count: programByteCount)
debugLog("The program bytes are \(Array(buffer)).")
}
// Now that we have them in our context, we can get ready to get the top
// level function and create an op.
self.entryFunctionBaseName = String(cString: entryFunctionBaseNameAddress)
debugLog("""
Creating a new op with func base name \
\(String(cString: entryFunctionBaseNameAddress)).
""")
self.helperFunctionCount = helperFunctionCount
self.stateTF = TFState(programByteAddress,
programByteCount: programByteCount)
debugLog("Done initializing TF-specific state.")
if _RuntimeConfig.executionMode == .gpu {
guard context.gpuDeviceName != nil else {
fatalError("""
The availability of a GPU device should have been confirmed \
at this point.
""")
}
}
debugLog("Populating the op's input list.")
for (i, inputTensorHandle) in inputTensorHandles.enumerated() {
if _RuntimeConfig.printsDebugLog {
dumpCTensorHandleContent(i, inputTensorHandle)
}
guard let stateTF = stateTF else {
fatalError("stateTF must be defined.")
}
stateTF.addInput(inputTensorHandle)
}
debugLog("Created returning info.")
self.returnValues = Array(repeating: nil, count: resultCount)
debugLog("Starting TF graph execution.")
// If it's asynchronous, we start a pthread that calls execute().
if !_RuntimeConfig.usesSynchronousExecution {
// The function to launch in the parallel thread.
#if os(macOS) || os(iOS) || os(watchOS) || os(tvOS)
typealias ThreadBody = @convention(c)
(UnsafeMutableRawPointer) -> UnsafeMutableRawPointer?
#else
typealias ThreadBody = @convention(c)
(UnsafeMutableRawPointer?) -> UnsafeMutableRawPointer?
#endif
let body: ThreadBody = { arg in
// Set the cancelability of the detached thread.
pthread_setcanceltype(Int32(PTHREAD_CANCEL_DEFERRED), nil)
// Execute the tensor computation.
#if !(os(macOS) || os(iOS) || os(watchOS) || os(tvOS))
let arg = arg!
#endif
let computation: _TensorComputation =
Unmanaged.fromOpaque(arg).takeRetainedValue()
computation.execute()
checkOk(computation.status)
return nil
}
#if os(macOS) || os(iOS) || os(watchOS) || os(tvOS)
var newThread: pthread_t?
#else
var newThread = pthread_t()
#endif
let creationStatus = pthread_create(
&newThread, nil, body,
Unmanaged.passRetained(self).toOpaque()
)
self.pthread = newThread
// TODO(hongm): do error handling.
internalConsistencyCheck(creationStatus == 0)
}
// If it's asynchronous, we call execute() on the main thread directly.
else {
// Log a debug message to differentiate from async computation.
debugLog("Running tensor computation synchronously.")
execute()
}
debugLog("Exiting _TensorComputation.init().")
}
deinit {
debugLog("De-initializing _TensorComputation.")
TF_DeleteStatus(status)
}
}
private extension _TensorComputation {
// NOTE: This is to be called by the initializer. The computation gets
// executed on initialization, thus this method will not be exposed to users.
private func execute() {
debugLog("Executing TF function \(entryFunctionBaseName).")
guard let stateTF = stateTF else {
fatalError("stateTF must be defined.")
}
stateTF.execute(entryFunctionBaseName,
helperFunctionCount: helperFunctionCount,
returnValues: &returnValues)
debugLog("Done execution.")
}
}
public extension _TensorComputation {
/// Terminates the computation, and clean up the state.
func terminate() {
if let pthread = pthread {
// TODO(hongm): Assess TF's thread cancel support.
let cancelStatus = pthread_cancel(pthread)
internalConsistencyCheck(cancelStatus == 0)
self.pthread = nil
}
}
/// Waits for completion the computation as given by 'program', and returns
/// output handles, whose underlying tensors may live on CPU or GPU.
func finish() -> [CTensorHandle] {
debugLog("Calling _TensorComputation.finish().")
if let pthread = pthread {
debugLog("Waiting for thread to join.")
let joinStatus = pthread_join(pthread, nil)
internalConsistencyCheck(joinStatus == 0)
self.pthread = nil
}
debugLog("Done executing TF graph.")
// Now that all the elements have been filled in, remove a level of
// optional.
return returnValues.map { $0! }
}
}
extension _TensorComputation {
@_versioned
var cSession: CTFSession {
if let stateTF = stateTF {
return stateTF.cSession
}
fatalError("No TF Session is available!")
}
}
//===----------------------------------------------------------------------===//
// - MARK: Compiler runtime entrypoints
//===----------------------------------------------------------------------===//
// These are the entrypoints that are well-known to the compiler internals. The
// signatures and forms must not be changed without updating the compiler. Any
// code put into the body of these functions will end up being inlined into the
// user code, so they are generally just wrappers around the implementation
// above.
/// Loads the TF computation from a binary TF FunctionDef proto given by 'bytes'
/// and 'size', start the computation, and return a _TensorComputation object as
/// a unique identifier for that computation.
///
/// - Parameters:
/// - programByteAddress: The address of the raw program.
/// - programByteCount: The number of bytes in the program.
/// - entryFunctionBaseNameAddress: The base name of the functions to run.
/// - tensorArgumentAddress: The address to the buffer containing tensor
/// arguments as CTensorHandle.
/// - tensorArgumentCount: The number of tensor arguments to pass in.
/// - helperFunctionCount: The number of helper functions to run.
/// - resultCount: The number of output tensors.
@_inlineable
@_silgen_name("_swift_tfc_StartTensorComputation")
public func _TFCStartTensorComputation(
_ programByteAddress: UnsafeRawPointer,
_ programByteCount: Int,
_ entryFunctionBaseNameAddress: UnsafePointer<Int8>,
_ tensorArgumentAddress: UnsafePointer<CTensorHandle>,
_ tensorArgumentCount: Int,
_ helperFunctionCount: Int,
_ resultCount: Int
) -> _TensorComputation {
debugLog("""
_TFCStartTensorComputation() is called with \(programByteCount) \
program bytes, \(tensorArgumentCount) input tensors \
\(String(cString:entryFunctionBaseNameAddress)) as the func base name, \
\(helperFunctionCount) helper functions, and \(resultCount) output tensors.
""")
return _TensorComputation(programByteAddress: programByteAddress,
programByteCount: programByteCount,
entryFunctionBaseNameAddress: entryFunctionBaseNameAddress,
tensorArgumentAddress: tensorArgumentAddress,
tensorArgumentCount: tensorArgumentCount,
helperFunctionCount: helperFunctionCount,
resultCount: resultCount)
}
/// Waits for completion of the computation as given by `computation`, and
/// returns results.
///
/// - Parameters:
/// - computation: The tensor computation to finish.
/// - tensorResultAddress: The address to an uninitialized buffer to accept
/// results of the computation, where the output tensors may live on CPU or
/// GPU.
/// - tensorResultCount: The number of results to accept from the computation.
/// - Note: The result address as passed in is pointing to uninitialized memory,
/// this must initialize the memory, transfering ownership of the tensor
/// handles to the caller.
@_inlineable
@_silgen_name("_swift_tfc_FinishTensorComputation")
public func _TFCFinishTensorComputation(
_ computation: _TensorComputation,
_ tensorResultAddress: UnsafeMutablePointer<CTensorHandle>,
_ tensorResultCount: Int
) {
debugLog("Expecting \(tensorResultCount) output tensors.")
let results = computation.finish()
internalConsistencyCheck(results.count == tensorResultCount,
"internal compiler error: result count mismatch!")
tensorResultAddress.initialize(from: results, count: tensorResultCount)
}
/// Terminates the computation as given by 'program', and clean up the state.
///
/// - Parameters:
/// - program: The tensor program to terminate.
/// - Note: If the execution was synchronous, then this function does nothing.
@_inlineable
@_silgen_name("_swift_tfc_TerminateTensorComputation")
public func _TFCTerminateTensorComputation(_ computation: _TensorComputation) {
computation.terminate()
}
/// Creates a scalar CTensorHandle value for the given data type.
///
/// - Parameters:
/// - value: The scalar value.
/// - dtype: The TF data type of the tensor handle to create.
/// - Returns: A new CTensorHandle representing the scalar.
/// - Precondition: T must conform to AccelerableByTensorFlow and 'dtype' must
/// be equal to T's corresponding data type.
/// - TODO(rxwei): Constrain T to AccelerableByTensorFlow and remove the
/// precondition. This requires the compiler to emit a call to the generic
/// function.
@_inlineable
@_silgen_name("_swift_tfc_CreateCTensorHandle")
public func _TFCCreateCTensorHandle<T>(_ value : T,
_ dtype: TF_DataType) -> CTensorHandle {
// Create a new CTensor and initialize it to the scalar value.
let tensor = TF_AllocateTensor(dtype, nil, 0, MemoryLayout<T>.stride)
TF_TensorData(tensor).assumingMemoryBound(to: T.self).initialize(to: value)
// Create a new CTensorHandle from the CTensor.
let status = TF_NewStatus()
let cTensorHandle = TFE_NewTensorHandle(tensor, status)
checkOk(status)
TF_DeleteStatus(status)
TF_DeleteTensor(tensor)
return cTensorHandle!
}