blob: dba768e5b8ec0bdcc4f8d99e7e8773dd36df6abd [file] [log] [blame]
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for wrapping an eager op in a call op at runtime."""
import time
from tensorflow.python.data.experimental.ops import prefetching_ops
from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.eager import benchmarks_test_base
from tensorflow.python.eager import context
from tensorflow.python.eager import test
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import test_util
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import bitwise_ops
from tensorflow.python.ops import critical_section_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import random_ops
from tensorflow.python.ops.ragged import ragged_factory_ops
from tensorflow.python.ops.ragged import ragged_map_ops
from tensorflow.python.ops.ragged import ragged_tensor
from tensorflow.python.util import tf_inspect
def run_benchmark(func, num_iters, unused_execution_mode):
# warm up
func()
start = time.time()
for _ in range(num_iters):
func()
end = time.time()
return end - start
CPU = "/device:CPU:0"
GPU = "/device:GPU:0"
# TODO(srbs): Why can't we use absl parameterized here?
@test_util.with_eager_op_as_function
class MicroBenchmarks(benchmarks_test_base.MicroBenchmarksBase):
def __init__(self):
super().__init__()
self._m_2_by_2 = random_ops.random_uniform((2, 2))
self._m_2_by_2_int32 = random_ops.random_uniform((2, 2),
maxval=5,
dtype=dtypes.int32)
self._m_100_by_100 = random_ops.random_uniform((100, 100))
self._m_100_by_100_int32 = random_ops.random_uniform((100, 100),
maxval=5,
dtype=dtypes.int32)
self._m_1000_by_1000 = random_ops.random_uniform((1000, 1000))
self._m_1000_by_1000_int32 = random_ops.random_uniform((1000, 1000),
maxval=5,
dtype=dtypes.int32)
def _get_benchmark_name(self):
"""Copied from benchmarks_test.py."""
stack = tf_inspect.stack()
name = None
for frame in stack[::-1]:
f_locals = frame[0].f_locals
f_self = f_locals.get("self", None)
if isinstance(f_self, test.Benchmark):
name = frame[3] # Get the method name
# This is a hack to get around the fact that some methods might have a
# disable_tfrt decorator around them. In that case a function called
# 'decorated' wraps the real called function underneath and so we
# peek one deeper into the stack to get the real name.
if name == "decorated":
continue
else:
break
if name is None:
raise ValueError("Unable to determine calling Benchmark function.")
if context.is_tfrt_enabled():
name = name + "_tfrt"
if context.run_eager_op_as_function_enabled():
name = name + "_eager_op_as_function"
return name
def _run(self, func, num_iters):
self.run_report(run_benchmark, func, num_iters, report_mean_us=True)
def _benchmark_matmul(self, mat, device):
if device == GPU and not context.num_gpus():
return
with context.device(device):
if device == GPU:
mat = mat.gpu()
func = lambda: math_ops.matmul(mat, mat)
self._run(func, num_iters=5000)
def _benchmark_bitwise_and(self, mat, device):
if device == GPU and not context.num_gpus():
return
with context.device(device):
if device == GPU:
mat = mat.gpu()
func = lambda: bitwise_ops.bitwise_and(mat, mat)
self._run(func, num_iters=5000)
def _benchmark_random_normal(self, device):
if device == GPU and not context.num_gpus():
return
with context.device(device):
def func():
mat = constant_op.constant([3], dtypes.int32)
s = mat + mat
random_ops.random_normal(shape=s)
self._run(func, num_iters=5000)
# This only needs to be tested on GPU where redundant data transfers can occur
def benchmark_random_normal_GPU(self):
self._benchmark_random_normal(GPU)
def benchmark_tf_matmul_2_by_2_CPU(self):
self._benchmark_matmul(self._m_2_by_2, CPU)
def benchmark_tf_bitwise_and_2_by_2_CPU(self):
self._benchmark_bitwise_and(self._m_2_by_2_int32, CPU)
def benchmark_tf_matmul_2_by_2_GPU(self):
self._benchmark_matmul(self._m_2_by_2, GPU)
def benchmark_tf_bitwise_and_2_by_2_GPU(self):
self._benchmark_bitwise_and(self._m_2_by_2_int32, GPU)
def benchmark_tf_matmul_100_by_100_CPU(self):
self._benchmark_matmul(self._m_100_by_100, CPU)
def benchmark_tf_bitwise_and_100_by_100_CPU(self):
self._benchmark_bitwise_and(self._m_100_by_100_int32, CPU)
def benchmark_tf_matmul_100_by_100_GPU(self):
self._benchmark_matmul(self._m_100_by_100, GPU)
def benchmark_tf_bitwise_and_100_by_100_GPU(self):
self._benchmark_bitwise_and(self._m_100_by_100_int32, GPU)
def benchmark_tf_matmul_1000_by_1000_CPU(self):
self._benchmark_matmul(self._m_1000_by_1000, CPU)
def benchmark_tf_bitwise_and_1000_by_1000_CPU(self):
self._benchmark_bitwise_and(self._m_1000_by_1000_int32, CPU)
def benchmark_tf_matmul_1000_by_1000_GPU(self):
self._benchmark_matmul(self._m_1000_by_1000, GPU)
def benchmark_tf_bitwise_and_1000_by_1000_GPU(self):
self._benchmark_bitwise_and(self._m_1000_by_1000_int32, GPU)
@test_util.with_eager_op_as_function
class RunEagerOpAsFunctionTest(test.TestCase):
def setUp(self):
super().setUp()
self._m_2_by_2 = random_ops.random_uniform((2, 2))
def testDefaultAttrValues(self):
ragged_map_ops.map_fn(
fn=lambda x: x,
elems=ragged_factory_ops.constant([[7]]),
dtype=ragged_tensor.RaggedTensorType(dtype=dtypes.int32, ragged_rank=1))
def testArrayFill(self):
array_ops.fill(
constant_op.constant([2], dtype=dtypes.int64), constant_op.constant(1))
def testDatasetMap(self):
# When a GPU is available, this would test that the wrapped call ops are
# placed on the CPU (i.e. the device is selected using the unwrapped op).
dataset_ops.Dataset.range(2).map(math_ops.square)
def testPrefetchToDevice(self):
if not context.num_gpus():
self.skipTest("No GPU available")
dataset = dataset_ops.Dataset.range(10)
dataset = dataset.apply(prefetching_ops.prefetch_to_device("/gpu:0"))
def testMatmul(self):
math_ops.matmul(self._m_2_by_2, self._m_2_by_2)
def testMixedTypeListInputFastPath(self):
array_ops.identity_n([self._m_2_by_2, self._m_2_by_2])
def testMixedTypeListInputEagerFallback(self):
array_ops.identity_n([1, 1])
def testMixedTypeListInputFastPathDifferentArity(self):
# This tests that the FunctionDef cache key contains the number of args.
array_ops.identity_n([self._m_2_by_2, self._m_2_by_2])
array_ops.identity_n([self._m_2_by_2, self._m_2_by_2, self._m_2_by_2])
def testMixedTypeListInputEagerFallbackDifferentArity(self):
array_ops.identity_n([1, 1])
array_ops.identity_n([1, 1, 1])
def testSingleTypeListFastPath(self):
array_ops.concat([self._m_2_by_2, self._m_2_by_2], axis=-1)
def testSingleTypeListEagerFallback(self):
array_ops.concat([[1], [2]], axis=-1)
def testSingleTypeListFastPathDifferentArity(self):
array_ops.concat([self._m_2_by_2, self._m_2_by_2], axis=-1)
array_ops.concat([self._m_2_by_2, self._m_2_by_2, self._m_2_by_2], axis=-1)
def testSingleTypeListEagerFallbackDifferentArity(self):
array_ops.concat([[1], [2]], axis=-1)
array_ops.concat([[1], [2], [3]], axis=-1)
def testCreateCriticalSection(self):
cs = critical_section_ops.CriticalSection(shared_name="cs")
cs.execute(lambda: 1.0)
class RunEagerOpAsFunctionInternalsTest(test.TestCase):
@test_util.enable_eager_op_as_function
def testSimpleGraphExecutesSynchronously(self):
if context.num_gpus():
self.skipTest("CPU-only test (requires unpartitioned graph).")
default_executor = test_util.TestDelta("flr_executor", "default")
single_threaded = test_util.TestDelta("flr_executor", "single_threaded")
run_async = test_util.TestDelta("pflr_runsync", "async")
run_sync = test_util.TestDelta("pflr_runsync", "sync")
safe = test_util.TestDelta("subgraph_async_summary", "safe_for_sync")
array_ops.fill([2], constant_op.constant(7, dtype=dtypes.int64))
assert default_executor.Get() == 0
assert single_threaded.Get() > 0
assert run_async.Get() == 0
assert run_sync.Get() > 0
assert safe.Get() > 0
@test_util.enable_eager_op_as_function
def testSendRecvPartitionedGraphExecutesSynchronously(self):
if not context.num_gpus():
self.skipTest("GPU-only test (requires partitioned graph).")
default_executor = test_util.TestDelta("flr_executor", "default")
single_threaded = test_util.TestDelta("flr_executor", "single_threaded")
run_async = test_util.TestDelta("pflr_runsync", "async")
run_sync = test_util.TestDelta("pflr_runsync", "sync")
send_only = test_util.TestDelta("subgraph_async_summary", "send_only")
recv_only = test_util.TestDelta("subgraph_async_summary", "recv_only")
array_ops.fill([2], constant_op.constant(7, dtype=dtypes.int64))
assert default_executor.Get() == 0
assert single_threaded.Get() > 0
assert run_async.Get() == 0
assert run_sync.Get() > 0
assert send_only.Get() > 0
assert recv_only.Get() > 0
if __name__ == "__main__":
test.main()