blob: 1e50961a819c815e35b383a9fab03585d6b09fe5 [file] [log] [blame]
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for the mirrored values library."""
import os
from absl.testing import parameterized
from tensorflow.core.protobuf import config_pb2
from tensorflow.python.distribute import combinations
from tensorflow.python.distribute import strategy_combinations
from tensorflow.python.distribute import strategy_test_lib
from tensorflow.python.distribute import test_util as ds_test_util
from tensorflow.python.distribute import tpu_values
from tensorflow.python.distribute import values as values_lib
from tensorflow.python.eager import context
from tensorflow.python.eager import test
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import ops
from tensorflow.python.framework import test_util
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import variable_scope
from tensorflow.python.training import saver as saver_lib
def _make_mirrored(distribution=None):
v = []
if distribution:
devices = distribution.extended.worker_devices
else:
devices = ["/device:GPU:0", "/device:CPU:0"]
for d, n, init in zip(devices, ["v", "v/replica"], [1., 2.]):
with ops.device(d):
v.append(
variable_scope.get_variable(
name=n, initializer=init, use_resource=True))
if (distribution
is not None) and strategy_test_lib.is_tpu_strategy(distribution):
var_cls = tpu_values.TPUMirroredVariable
else:
var_cls = values_lib.MirroredVariable
mirrored = var_cls(distribution, v, variable_scope.VariableAggregation.SUM)
return mirrored
def _make_mirrored_val(init_val=5.0):
v = []
devices = ["/device:GPU:0", "/device:CPU:0"]
for d, _ in zip(devices, ["v", "v/replica"]):
with ops.device(d):
v.append(constant_op.constant(init_val))
return values_lib.Mirrored(v)
def mirrored_and_tpu_strategy_combinations():
return combinations.combine(
distribution=[
strategy_combinations.mirrored_strategy_with_gpu_and_cpu,
strategy_combinations.mirrored_strategy_with_two_gpus_no_merge_call,
strategy_combinations.tpu_strategy,
strategy_combinations.tpu_strategy_packed_var,
],
mode=["graph", "eager"])
class MirroredVariableTest(test.TestCase, parameterized.TestCase):
config = config_pb2.ConfigProto()
config.allow_soft_placement = True
@test_util.run_in_graph_and_eager_modes(config=config)
def testProperties(self):
if context.num_gpus() < 1 and context.executing_eagerly():
self.skipTest("A GPU is not available for this test in eager mode.")
mirrored = _make_mirrored()
v = mirrored.values[0]
self.assertEqual(v.name, mirrored.name)
self.assertEqual(v.dtype, mirrored.dtype)
self.assertEqual(v.shape, mirrored.shape)
@test_util.run_in_graph_and_eager_modes(config=config)
def testVariableOnAnotherDevice(self):
v = variable_scope.get_variable(
name="v", initializer=[1.], use_resource=True)
mirrored = values_lib.MirroredVariable(
None, (v,), variable_scope.VariableAggregation.MEAN)
self.assertEqual(v.name, mirrored.name)
self.assertEqual(v.dtype, mirrored.dtype)
self.assertEqual(v.shape, mirrored.shape)
class MirroredVariableSaveRestoreTest(test.TestCase, parameterized.TestCase):
def _assign_mirrored(self, v, new):
for var, n in zip(v.values, new):
self.evaluate(var.assign(n))
def _save_return_saver(self, sess, var):
saver = saver_lib.Saver(var_list=[var])
test_dir = self.get_temp_dir()
prefix = os.path.join(test_dir, "ckpt")
return saver.save(sess, prefix), saver
def _save(self, sess, var):
save_path, _ = self._save_return_saver(sess, var)
return save_path
def _save_mirrored(self, distribution):
"""Save variables with mirroring, returns save_path."""
with self.session(graph=ops.Graph()) as sess:
mirrored = _make_mirrored(distribution)
# Overwrite the initial values.
self._assign_mirrored(mirrored, [3., 4.])
# Saves the current value of v[0], 3.
save_path = self._save(sess, mirrored)
# Change the values between save and restore.
self._assign_mirrored(mirrored, [5., 6.])
return save_path
def _save_normal(self):
"""Save variables without mirroring, returns save_path."""
with self.session(graph=ops.Graph()) as sess:
var = variable_scope.get_variable(
name="v", initializer=1., use_resource=True)
# Overwrite the initial value.
self.evaluate(var.assign(3.))
# Saves the current value of var, 3.
save_path = self._save(sess, var)
# Change the values between save and restore.
self.evaluate(var.assign(5.))
return save_path
def _restore_normal(self, save_path):
"""Restore to variables without mirroring in a fresh graph."""
with self.session(graph=ops.Graph()) as sess:
var = variable_scope.get_variable(
name="v", initializer=7., use_resource=True)
# Overwrite the initial value.
self.evaluate(var.assign(8.))
# Restores the saved value of 3. to `var`.
saver = saver_lib.Saver(var_list=[var])
saver.restore(sess, save_path)
self.assertEqual(3., self.evaluate(var))
def _restore_mirrored(self, save_path, distribution):
"""Restore to variables with mirroring in a fresh graph."""
with self.session(graph=ops.Graph()) as sess:
mirrored = _make_mirrored(distribution)
v = mirrored.values
# Overwrite the initial values.
self._assign_mirrored(mirrored, [7., 8.])
# Restores the saved value of 3. to both variables.
saver = saver_lib.Saver(var_list=[mirrored])
saver.restore(sess, save_path)
self.assertEqual([3., 3.], self.evaluate([v[0], v[1]]))
@combinations.generate(mirrored_and_tpu_strategy_combinations())
def testSaveAndRestoreMirroredOneGraph(self, distribution):
with self.cached_session() as sess:
mirrored = _make_mirrored(distribution)
v = mirrored .values
# Overwrite the initial values.
self._assign_mirrored(mirrored, [3., 4.])
# Saves the current value of v[0], 3.
save_path, saver = self._save_return_saver(sess, mirrored)
# Change the values between save and restore.
self._assign_mirrored(mirrored, [5., 6.])
# Restores the saved value of 3. to both variables.
saver.restore(sess, save_path)
self.assertEqual([3., 3.], self.evaluate([v[0], v[1]]))
@combinations.generate(mirrored_and_tpu_strategy_combinations())
def testSaveMirroredRestoreMirrored(self, distribution):
if context.num_gpus() < 1 and context.executing_eagerly():
# Graph mode can work without GPU because the Placer "moves" the
# variable to a CPU. In other words, if there is no GPU available, but
# user requested to create a variable on GPU, Placer will ignore the
# user request and assign the VarHandleOp to CPU. This requires
# soft_placement, which is on by default.
self.skipTest("A GPU is not available for this test in eager mode.")
save_path = self._save_mirrored(distribution)
self._restore_mirrored(save_path, distribution)
@combinations.generate(mirrored_and_tpu_strategy_combinations())
def testSaveMirroredRestoreNormal(self, distribution):
if context.num_gpus() < 1 and context.executing_eagerly():
# Graph mode can work without GPU because the Placer "moves" the
# variable to a CPU. In other words, if there is no GPU available, but
# user requested to create a variable on GPU, Placer will ignore the
# user request and assign the VarHandleOp to CPU. This requires
# soft_placement, which is on by default.
self.skipTest("A GPU is not available for this test in eager mode.")
save_path = self._save_mirrored(distribution)
self._restore_normal(save_path)
@combinations.generate(mirrored_and_tpu_strategy_combinations())
def testSaveNormalRestoreMirrored(self, distribution):
if context.num_gpus() < 1 and context.executing_eagerly():
# Graph mode can work without GPU because the Placer "moves" the
# variable to a CPU. In other words, if there is no GPU available, but
# user requested to create a variable on GPU, Placer will ignore the
# user request and assign the VarHandleOp to CPU. This requires
# soft_placement, which is on by default.
self.skipTest("A GPU is not available for this test in eager mode.")
save_path = self._save_normal()
self._restore_mirrored(save_path, distribution)
class MirroredTest(test.TestCase):
def testAddOp(self):
if context.num_gpus() < 1:
self.skipTest("A GPU is not available for this test.")
mirrored_val = _make_mirrored_val(init_val=3.)
self.assertEqual(self.evaluate(constant_op.constant(6.)),
self.evaluate(mirrored_val + mirrored_val))
self.assertEqual(self.evaluate(constant_op.constant(4.)),
self.evaluate(mirrored_val + 1))
self.assertEqual(self.evaluate(mirrored_val + 1),
self.evaluate(math_ops.add(mirrored_val, 1)))
self.assertEqual(type(mirrored_val + 1),
type(math_ops.add(mirrored_val, 1)))
if __name__ == "__main__":
ds_test_util.main()