blob: 7e09ebf7c21bc81130fa8956ccdb1d171bfb545b [file] [log] [blame]
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for SlurmClusterResolver."""
import os
from tensorflow.python.distribute.cluster_resolver.slurm_cluster_resolver import expand_hostlist
from tensorflow.python.distribute.cluster_resolver.slurm_cluster_resolver import expand_tasks_per_node
from tensorflow.python.distribute.cluster_resolver.slurm_cluster_resolver import SlurmClusterResolver
from tensorflow.python.platform import test
from tensorflow.python.training import server_lib
mock = test.mock
class SlurmClusterResolverTest(test.TestCase):
def test_expand_hostlist(self):
self.assertEqual(expand_hostlist('n1'), ['n1'])
self.assertEqual(expand_hostlist('n[1,3]'), ['n1', 'n3'])
self.assertEqual(expand_hostlist('n[1-3]'), ['n1', 'n2', 'n3'])
self.assertEqual(
expand_hostlist('n[1-2],m5,o[3-4,6,7-9]'),
['n1', 'n2', 'm5', 'o3', 'o4', 'o6', 'o7', 'o8', 'o9'])
self.assertEqual(
expand_hostlist('n[0001-0003],m5,o[009-011]'),
['n0001', 'n0002', 'n0003', 'm5', 'o009', 'o010', 'o011'])
def test_expand_tasks_per_node(self):
self.assertEqual(expand_tasks_per_node('2'), [2])
self.assertEqual(expand_tasks_per_node('2,1,3'), [2, 1, 3])
self.assertEqual(expand_tasks_per_node('3(x2),2,1'), [3, 3, 2, 1])
self.assertEqual(
expand_tasks_per_node('3(x2),2,11(x4)'), [3, 3, 2, 11, 11, 11, 11])
self.assertEqual(
expand_tasks_per_node('13(x10)'),
[13, 13, 13, 13, 13, 13, 13, 13, 13, 13])
def _verifyClusterSpecEquality(self, cluster_spec, expected_proto):
self.assertProtoEquals(expected_proto, cluster_spec.as_cluster_def())
self.assertProtoEquals(
expected_proto,
server_lib.ClusterSpec(cluster_spec).as_cluster_def())
self.assertProtoEquals(
expected_proto,
server_lib.ClusterSpec(cluster_spec.as_cluster_def()).as_cluster_def())
self.assertProtoEquals(
expected_proto,
server_lib.ClusterSpec(cluster_spec.as_dict()).as_cluster_def())
@mock.patch.dict(
os.environ, {
'SLURM_PROCID': '0',
'SLURM_STEP_NUM_TASKS': '3',
'SLURM_STEP_TASKS_PER_NODE': '1(x3)',
'SLURM_STEP_NODELIST': 't02n13,t02n41,t02n43',
'CUDA_VISIBLE_DEVICES': '0',
})
def testSimpleRetrievalFromEnv(self):
slurm_cluster_resolver = SlurmClusterResolver()
actual_cluster_spec = slurm_cluster_resolver.cluster_spec()
expected_proto = """
job { name: 'worker' tasks { key: 0 value: 't02n13:8888' }
tasks { key: 1 value: 't02n41:8888' }
tasks { key: 2 value: 't02n43:8888' } }
"""
self._verifyClusterSpecEquality(actual_cluster_spec, expected_proto)
self.assertEqual(
slurm_cluster_resolver.master('worker', 0, rpc_layer='grpc'),
'grpc://t02n13:8888')
self.assertEqual(slurm_cluster_resolver.num_accelerators(), {'GPU': 1})
self.assertEqual(os.environ['CUDA_VISIBLE_DEVICES'], '0')
@mock.patch.dict(
os.environ, {
'SLURM_PROCID': '0',
'SLURM_STEP_NUM_TASKS': '3',
'SLURM_STEP_NODELIST': 't02n13,t02n41,t02n43',
})
def testSimpleSuccessfulRetrieval(self):
slurm_cluster_resolver = SlurmClusterResolver(
jobs={
'ps': 1,
'worker': 2
},
port_base=8888,
tasks_per_node=1,
gpus_per_node=1,
gpus_per_task=1,
auto_set_gpu=False)
actual_cluster_spec = slurm_cluster_resolver.cluster_spec()
expected_proto = """
job { name: 'ps' tasks { value: 't02n13:8888' } }
job { name: 'worker' tasks { key: 0 value: 't02n41:8888' }
tasks { key: 1 value: 't02n43:8888' } }
"""
self._verifyClusterSpecEquality(actual_cluster_spec, expected_proto)
@mock.patch.dict(
os.environ, {
'SLURM_PROCID': '0',
'SLURM_STEP_NUM_TASKS': '3',
'SLURM_STEP_NODELIST': 't02n13,t02n41,t02n43',
})
def testSimpleMasterRetrieval(self):
slurm_cluster_resolver = SlurmClusterResolver(
jobs={
'ps': 1,
'worker': 2
},
port_base=8888,
tasks_per_node=1,
gpus_per_node=1,
gpus_per_task=1,
auto_set_gpu=False)
slurm_cluster_resolver.task_type = 'worker'
slurm_cluster_resolver.task_id = 1
self.assertEqual(slurm_cluster_resolver.master(), 'grpc://t02n43:8888')
slurm_cluster_resolver.rpc_layer = 'ab'
self.assertEqual(slurm_cluster_resolver.master('ps', 0), 'ab://t02n13:8888')
self.assertEqual(
slurm_cluster_resolver.master('ps', 0, rpc_layer='test'),
'test://t02n13:8888')
@mock.patch.dict(
os.environ, {
'SLURM_PROCID': '0',
'SLURM_STEP_NUM_TASKS': '3',
'SLURM_STEP_TASKS_PER_NODE': '1(x3)',
'SLURM_STEP_NODELIST': 't02n13,t02n41,t02n43',
})
def testTaskPerNodeNotSetRetrieval(self):
slurm_cluster_resolver = SlurmClusterResolver(
jobs={
'ps': 1,
'worker': 2
},
port_base=8888,
gpus_per_node=1,
gpus_per_task=1,
auto_set_gpu=False)
actual_cluster_spec = slurm_cluster_resolver.cluster_spec()
expected_proto = """
job { name: 'ps' tasks { value: 't02n13:8888' } }
job { name: 'worker' tasks { key: 0 value: 't02n41:8888' }
tasks { key: 1 value: 't02n43:8888' } }
"""
self._verifyClusterSpecEquality(actual_cluster_spec, expected_proto)
@mock.patch.dict(
os.environ, {
'SLURM_PROCID': '1',
'SLURM_STEP_NUM_TASKS': '5',
'SLURM_STEP_TASKS_PER_NODE': '2(x2),1',
'SLURM_STEP_NODELIST': 't02n13,t02n41,t02n43',
'CUDA_VISIBLE_DEVICES': '',
})
def testMultiTaskPerNodeRetrieval(self):
slurm_cluster_resolver = SlurmClusterResolver(
jobs={
'ps': 1,
'worker': 4
},
port_base=8888,
gpus_per_node=2,
gpus_per_task=1,
auto_set_gpu=True)
actual_cluster_spec = slurm_cluster_resolver.cluster_spec()
expected_proto = """
job { name: 'ps' tasks { value: 't02n13:8888' } }
job { name: 'worker' tasks { key: 0 value: 't02n13:8889' }
tasks { key: 1 value: 't02n41:8888' }
tasks { key: 2 value: 't02n41:8889' }
tasks { key: 3 value: 't02n43:8888' } }
"""
self._verifyClusterSpecEquality(actual_cluster_spec, expected_proto)
assert os.environ['CUDA_VISIBLE_DEVICES'] == '1'
@mock.patch.dict(
os.environ, {
'SLURM_PROCID': '1',
'SLURM_STEP_NUM_TASKS': '5',
'SLURM_STEP_TASKS_PER_NODE': '2(x2),1',
'SLURM_STEP_NODELIST': 't02n13,t02n41,t02n43',
'CUDA_VISIBLE_DEVICES': '',
})
def testMultipleGpusPerTaskRetrieval(self):
slurm_cluster_resolver = SlurmClusterResolver(
jobs={
'ps': 1,
'worker': 4
},
port_base=8888,
gpus_per_node=4,
gpus_per_task=2,
auto_set_gpu=True)
actual_cluster_spec = slurm_cluster_resolver.cluster_spec()
expected_proto = """
job { name: 'ps' tasks { value: 't02n13:8888' } }
job { name: 'worker' tasks { key: 0 value: 't02n13:8889' }
tasks { key: 1 value: 't02n41:8888' }
tasks { key: 2 value: 't02n41:8889' }
tasks { key: 3 value: 't02n43:8888' } }
"""
self._verifyClusterSpecEquality(actual_cluster_spec, expected_proto)
assert os.environ['CUDA_VISIBLE_DEVICES'] == '2,3'
if __name__ == '__main__':
test.main()