blob: ed9ef493ec31566a4a07b9373e66f127d7538b1a [file] [log] [blame]
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Cluster Resolvers are used for dynamic cluster IP/hostname resolution."""
import abc
import collections
import six
from tensorflow.python.client import session
from tensorflow.python.eager import context
from tensorflow.python.framework import config
from tensorflow.python.framework import ops
from tensorflow.python.training.server_lib import ClusterSpec
from tensorflow.python.util.tf_export import tf_export
def format_master_url(master, rpc_layer=None):
if rpc_layer:
return '%s://%s' % (rpc_layer, master)
else:
return master
def get_accelerator_devices(master, config_proto):
"""Returns accelerator devices given a master and a configuration."""
if context.executing_eagerly():
logical_devices = config.list_logical_devices()
devices = []
for d in logical_devices:
if d.device_type == 'CPU' or d.device_type == 'XLA_CPU': # Filter CPUs
continue
devices.append(session._DeviceAttributes(d.name, d.device_type, 0, 0)) # pylint: disable=protected-access
return devices
else:
with ops.Graph().as_default():
with session.Session(master, config=config_proto) as s:
devices = s.list_devices()
return devices
@tf_export('distribute.cluster_resolver.ClusterResolver')
@six.add_metaclass(abc.ABCMeta)
class ClusterResolver(object):
"""Abstract class for all implementations of ClusterResolvers.
This defines the skeleton for all implementations of ClusterResolvers.
ClusterResolvers are a way for TensorFlow to communicate with various cluster
management systems (e.g. GCE, AWS, etc...) and gives TensorFlow necessary
information to set up distributed training.
By letting TensorFlow communicate with these systems, we will be able to
automatically discover and resolve IP addresses for various TensorFlow
workers. This will eventually allow us to automatically recover from
underlying machine failures and scale TensorFlow worker clusters up and down.
Note to Implementors of `tf.distribute.cluster_resolver.ClusterResolver`
subclass: In addition to these abstract methods, when task_type, task_id, and
rpc_layer attributes are applicable, you should also implement them either as
properties with getters or setters, or directly set the attributes
`self._task_type`, `self._task_id`, or `self._rpc_layer` so the base class'
getters and setters are used. See
`tf.distribute.cluster_resolver.SimpleClusterResolver.__init__` for an
example.
In general, multi-client tf.distribute strategies such as
`tf.distribute.experimental.MultiWorkerMirroredStrategy` require task_type and
task_id properties to be available in the `ClusterResolver` they are using. On
the other hand, these concepts are not applicable in single-client strategies,
such as `tf.distribute.experimental.TPUStrategy`, because the program is only
expected to be run on one task, so there should not be a need to have code
branches according to task type and task id.
- task_type is the name of the server's current named job (e.g. 'worker',
'ps' in a distributed parameterized training job).
- task_id is the ordinal index of the server within the task type.
- rpc_layer is the protocol used by TensorFlow to communicate with other
TensorFlow servers in a distributed environment.
"""
@abc.abstractmethod
def cluster_spec(self):
"""Retrieve the current state of the cluster and return a `tf.train.ClusterSpec`.
Returns:
A `tf.train.ClusterSpec` representing the state of the cluster at the
moment this function is called.
Implementors of this function must take care in ensuring that the
ClusterSpec returned is up-to-date at the time of calling this function.
This usually means retrieving the information from the underlying cluster
management system every time this function is invoked and reconstructing
a cluster_spec, rather than attempting to cache anything.
"""
raise NotImplementedError()
@abc.abstractmethod
def master(self, task_type=None, task_id=None, rpc_layer=None):
"""Retrieves the name or URL of the session master.
Note: this is only useful for TensorFlow 1.x.
Args:
task_type: (Optional) The type of the TensorFlow task of the master.
task_id: (Optional) The index of the TensorFlow task of the master.
rpc_layer: (Optional) The RPC protocol for the given cluster.
Returns:
The name or URL of the session master.
Implementors of this function must take care in ensuring that the master
returned is up-to-date at the time to calling this function. This usually
means retrieving the master every time this function is invoked.
"""
raise NotImplementedError()
def num_accelerators(self,
task_type=None,
task_id=None,
config_proto=None):
"""Returns the number of accelerator cores per worker.
This returns the number of accelerator cores (such as GPUs and TPUs)
available per worker.
Optionally, we allow callers to specify the task_type, and task_id, for
if they want to target a specific TensorFlow task to query
the number of accelerators. This is to support heterogenous environments,
where the number of accelerators cores per host is different.
Args:
task_type: (Optional) The type of the TensorFlow task of the machine we
want to query.
task_id: (Optional) The index of the TensorFlow task of the machine we
want to query.
config_proto: (Optional) Configuration for starting a new session to
query how many accelerator cores it has.
Returns:
A map of accelerator types to number of cores.
"""
master = self.master(task_type, task_id)
# TODO(b/126786766): in eager mode, we should check whether
# `tf.config.experimental_connect_to_cluster` is called or not.
devices = get_accelerator_devices(master, config_proto)
mapping = collections.defaultdict(int)
for device in devices:
if task_type is not None and task_id is not None:
job_path = '/job:%s' % task_type
task_path = '/task:%s' % task_id
if job_path not in device.name or task_path not in device.name:
continue
mapping[device.device_type] += 1
return mapping
@property
def environment(self):
"""Returns the current environment which TensorFlow is running in.
There are two possible return values, "google" (when TensorFlow is running
in a Google-internal environment) or an empty string (when TensorFlow is
running elsewhere).
If you are implementing a ClusterResolver that works in both the Google
environment and the open-source world (for instance, a TPU ClusterResolver
or similar), you will have to return the appropriate string depending on the
environment, which you will have to detect.
Otherwise, if you are implementing a ClusterResolver that will only work
in open-source TensorFlow, you do not need to implement this property.
"""
return ''
@property
def task_type(self):
"""Returns the task type this `ClusterResolver` indicates.
In TensorFlow distributed environment, each job may have an applicable
task type. Valid task types in TensorFlow include
'chief': a worker that is designated with more responsibility,
'worker': a regular worker for training/evaluation,
'ps': a parameter server, or
'evaluator': an evaluator that evaluates the checkpoints for metrics.
See [Multi-worker configuration](
https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras#multi-worker_configuration)
for more information about 'chief' and 'worker' task type, which are most
commonly used.
Having access to such information is useful when user needs to run specific
code according to task types. For example,
```python
cluster_spec = tf.train.ClusterSpec({
"ps": ["localhost:2222", "localhost:2223"],
"worker": ["localhost:2224", "localhost:2225", "localhost:2226"]
})
# SimpleClusterResolver is used here for illustration; other cluster
# resolvers may be used for other source of task type/id.
simple_resolver = SimpleClusterResolver(cluster_spec, task_type="worker",
task_id=1)
...
if cluster_resolver.task_type == 'worker':
# Perform something that's only applicable on workers. This block
# will run on this particular instance since we've specified this task to
# be a worker in above cluster resolver.
elif cluster_resolver.task_type == 'ps':
# Perform something that's only applicable on parameter servers. This
# block will not run on this particular instance.
```
Returns `None` if such information is not available or is not applicable
in the current distributed environment, such as training with
`tf.distribute.experimental.TPUStrategy`.
For more information, please see
`tf.distribute.cluster_resolver.ClusterResolver`'s class doc.
"""
return getattr(self, '_task_type', None)
@property
def task_id(self):
"""Returns the task id this `ClusterResolver` indicates.
In TensorFlow distributed environment, each job may have an applicable
task id, which is the index of the instance within its task type. This is
useful when user needs to run specific code according to task index. For
example,
```python
cluster_spec = tf.train.ClusterSpec({
"ps": ["localhost:2222", "localhost:2223"],
"worker": ["localhost:2224", "localhost:2225", "localhost:2226"]
})
# SimpleClusterResolver is used here for illustration; other cluster
# resolvers may be used for other source of task type/id.
simple_resolver = SimpleClusterResolver(cluster_spec, task_type="worker",
task_id=0)
...
if cluster_resolver.task_type == 'worker' and cluster_resolver.task_id == 0:
# Perform something that's only applicable on 'worker' type, id 0. This
# block will run on this particular instance since we've specified this
# task to be a 'worker', id 0 in above cluster resolver.
else:
# Perform something that's only applicable on other ids. This block will
# not run on this particular instance.
```
Returns `None` if such information is not available or is not applicable
in the current distributed environment, such as training with
`tf.distribute.cluster_resolver.TPUClusterResolver`.
For more information, please see
`tf.distribute.cluster_resolver.ClusterResolver`'s class docstring.
"""
return getattr(self, '_task_id', None)
@task_type.setter
def task_type(self, task_type):
"""Setter of `task_type` property. See `task_type` property doc."""
self._task_type = task_type
@task_id.setter
def task_id(self, task_id):
"""Setter of `task_id` property. See `task_type` property doc."""
self._task_id = task_id
@tf_export('distribute.cluster_resolver.SimpleClusterResolver')
class SimpleClusterResolver(ClusterResolver):
"""Simple implementation of ClusterResolver that accepts all attributes.
Please see the base class for documentation of arguments of its constructor.
It is useful if you want to specify some or all attributes.
Usage example with `tf.distribute.Strategy`:
```Python
cluster = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222",
"worker1.example.com:2222"]})
# On worker 0
cluster_resolver = SimpleClusterResolver(cluster, task_type="worker",
task_id=0,
num_accelerators={"GPU": 8},
rpc_layer="grpc")
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
cluster_resolver=cluster_resolver)
# On worker 1
cluster_resolver = SimpleClusterResolver(cluster, task_type="worker",
task_id=1,
num_accelerators={"GPU": 8},
rpc_layer="grpc")
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
cluster_resolver=cluster_resolver)
```
"""
def __init__(self, cluster_spec, master='', task_type=None, task_id=None,
environment='', num_accelerators=None,
rpc_layer=None):
"""Creates a SimpleClusterResolver from a ClusterSpec."""
super(SimpleClusterResolver, self).__init__()
self._task_type = task_type
self._task_id = task_id
self._environment = environment
self._num_accelerators = num_accelerators
self._rpc_layer = rpc_layer
if not isinstance(cluster_spec, ClusterSpec):
raise TypeError('cluster_spec must be a `tf.train.ClusterSpec`.')
self._cluster_spec = cluster_spec
if not isinstance(master, str):
raise TypeError('master must be a string.')
self._master = master
def cluster_spec(self):
"""Returns the ClusterSpec passed into the constructor."""
return self._cluster_spec
def master(self, task_type=None, task_id=None, rpc_layer=None):
"""Returns the master address to use when creating a session.
Note: this is only useful for TensorFlow 1.x.
Args:
task_type: (Optional) The type of the TensorFlow task of the master.
task_id: (Optional) The index of the TensorFlow task of the master.
rpc_layer: (Optional) The RPC used by distributed TensorFlow.
Returns:
The name or URL of the session master.
If a task_type and task_id is given, this will override the `master`
string passed into the initialization function.
"""
if task_type is not None and task_id is not None:
master = self.cluster_spec().task_address(task_type, task_id)
else:
master = self._master
return format_master_url(master, rpc_layer=rpc_layer or self._rpc_layer)
@property
def task_type(self):
return self._task_type
@property
def task_id(self):
return self._task_id
@task_type.setter
def task_type(self, task_type):
self._task_type = task_type
@task_id.setter
def task_id(self, task_id):
self._task_id = task_id
@property
def environment(self):
return self._environment
def num_accelerators(self,
task_type=None,
task_id=None,
config_proto=None):
"""Returns the number of accelerator cores per worker.
The SimpleClusterResolver does not do automatic detection of accelerators,
and thus all arguments are unused and we simply return the value provided
in the constructor.
Args:
task_type: Unused.
task_id: Unused.
config_proto: Unused.
"""
# Unused
del task_type, task_id, config_proto
if self._num_accelerators is None:
return {}
return self._num_accelerators
@property
def rpc_layer(self):
return self._rpc_layer
@rpc_layer.setter
def rpc_layer(self, rpc_layer):
self._rpc_layer = rpc_layer
@tf_export('distribute.cluster_resolver.UnionResolver')
class UnionClusterResolver(ClusterResolver):
"""Performs a union on underlying ClusterResolvers.
This class performs a union given two or more existing ClusterResolvers. It
merges the underlying ClusterResolvers, and returns one unified ClusterSpec
when cluster_spec is called. The details of the merge function is
documented in the cluster_spec function.
For additional ClusterResolver properties such as task type, task index,
rpc layer, environment, etc..., we will return the value from the first
ClusterResolver in the union.
An example to combine two cluster resolvers:
```Python
cluster_0 = tf.train.ClusterSpec({"worker": ["worker0.example.com:2222",
"worker1.example.com:2222"]})
cluster_resolver_0 = SimpleClusterResolver(cluster, task_type="worker",
task_id=0,
rpc_layer="grpc")
cluster_1 = tf.train.ClusterSpec({"ps": ["ps0.example.com:2222",
"ps1.example.com:2222"]})
cluster_resolver_1 = SimpleClusterResolver(cluster, task_type="ps",
task_id=0,
rpc_layer="grpc")
# Its task type would be "worker".
cluster_resolver = UnionClusterResolver(cluster_resolver_0,
cluster_resolver_1)
```
An example to override the number of GPUs in a TFConfigClusterResolver
instance:
```Python
tf_config = TFConfigClusterResolver()
gpu_override = SimpleClusterResolver(tf_config.cluster_spec(),
num_accelerators={"GPU": 1})
cluster_resolver = UnionResolver(gpu_override, tf_config)
```
"""
def __init__(self, *args, **kwargs):
"""Initializes a UnionClusterResolver with other ClusterResolvers.
Args:
*args: `ClusterResolver` objects to be unionized.
**kwargs:
rpc_layer - (Optional) Override value for the RPC layer used by
TensorFlow.
task_type - (Optional) Override value for the current task type.
task_id - (Optional) Override value for the current task index.
Raises:
TypeError: If any argument is not a subclass of `ClusterResolvers`.
ValueError: If there are no arguments passed.
"""
super(UnionClusterResolver, self).__init__()
self._rpc_layer = kwargs.pop('rpc_layer', None)
self._task_type = kwargs.pop('task_type', None)
self._task_id = kwargs.pop('task_id', None)
if kwargs:
raise ValueError('Unexpected kwargs provided {!r}'.format(kwargs))
if not args:
raise ValueError('At least one ClusterResolver is required.')
for cluster_resolver in args:
if not isinstance(cluster_resolver, ClusterResolver):
raise TypeError('All arguments must be a sub-class of '
'`ClusterResolver.`')
self._cluster_resolvers = args
def cluster_spec(self):
"""Returns a union of all the ClusterSpecs from the ClusterResolvers.
Returns:
A ClusterSpec containing host information merged from all the underlying
ClusterResolvers.
Raises:
KeyError: If there are conflicting keys detected when merging two or
more dictionaries, this exception is raised.
Note: If there are multiple ClusterResolvers exposing ClusterSpecs with the
same job name, we will merge the list/dict of workers.
If *all* underlying ClusterSpecs expose the set of workers as lists, we will
concatenate the lists of workers, starting with the list of workers from
the first ClusterResolver passed into the constructor.
If *any* of the ClusterSpecs expose the set of workers as a dict, we will
treat all the sets of workers as dicts (even if they are returned as lists)
and will only merge them into a dict if there is no conflicting keys. If
there is a conflicting key, we will raise a `KeyError`.
"""
merged_cluster = {}
# We figure out whether it is all lists for a particular job, or whether
# there are dicts inside.
for cluster_resolver in self._cluster_resolvers:
cluster_spec = cluster_resolver.cluster_spec()
cluster_dict = cluster_spec.as_dict()
for job_name, tasks in cluster_dict.items():
if job_name in merged_cluster:
# If we see a dict, then we write a dict out regardless.
if isinstance(tasks, dict):
merged_cluster[job_name] = {}
else:
# We take whichever type is present.
if isinstance(tasks, list):
merged_cluster[job_name] = []
else:
merged_cluster[job_name] = {}
# We then do the merge as appropriate in merged_cluster[job].
for cluster_resolver in self._cluster_resolvers:
cluster_spec = cluster_resolver.cluster_spec()
cluster_dict = cluster_spec.as_dict()
for job_name, tasks in cluster_dict.items():
if isinstance(merged_cluster[job_name], list):
# We all have lists, we can just concatenate and be done.
merged_cluster[job_name].extend(tasks)
else:
if isinstance(tasks, list):
# We convert to a dictionary if the type is a list.
task_dict = dict(zip(range(0, len(tasks)), tasks))
else:
# We can simply make a copy (for update) and be done.
task_dict = tasks.copy()
# We detect if there are duplicates, and raise an error if so.
task_keys = set(task_dict)
merged_keys = set(merged_cluster[job_name].keys())
intersected_keys = task_keys.intersection(merged_keys)
if intersected_keys:
raise KeyError('Duplicate keys detected when merging two '
'ClusterSpecs: %s' % repr(intersected_keys))
# We do the merge after all the processing.
merged_cluster[job_name].update(task_dict)
return ClusterSpec(merged_cluster)
def master(self, task_type=None, task_id=None, rpc_layer=None):
"""Returns the master address to use when creating a session.
This usually returns the master from the first ClusterResolver passed in,
but you can override this by specifying the task_type and task_id.
Note: this is only useful for TensorFlow 1.x.
Args:
task_type: (Optional) The type of the TensorFlow task of the master.
task_id: (Optional) The index of the TensorFlow task of the master.
rpc_layer: (Optional) The RPC protocol for the given cluster.
Returns:
The name or URL of the session master.
"""
if task_type is not None and task_id is not None:
master = self.cluster_spec().task_address(task_type, task_id)
return format_master_url(master, rpc_layer or self._rpc_layer)
return self._cluster_resolvers[0].master(rpc_layer=rpc_layer)
@property
def task_type(self):
return self._task_type or self._cluster_resolvers[0].task_type
@property
def task_id(self):
return self._task_id or self._cluster_resolvers[0].task_id
@task_type.setter
def task_type(self, task_type):
self._task_type = task_type
@task_id.setter
def task_id(self, task_id):
self._task_id = task_id
@property
def environment(self):
return self._cluster_resolvers[0].environment
def num_accelerators(self,
task_type=None,
task_id=None,
config_proto=None):
return self._cluster_resolvers[0].num_accelerators(
task_type, task_id, config_proto)
@property
def rpc_layer(self):
return self._rpc_layer or self._cluster_resolvers[0].rpc_layer
@rpc_layer.setter
def rpc_layer(self, rpc_layer):
self._rpc_layer = rpc_layer