| # Copyright 2016 The TensorFlow Authors. All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the 'License'); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an 'AS IS' BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # ============================================================================== |
| """Tests for nccl ops. See also the cc test for nccl_communicator.""" |
| |
| from functools import partial |
| |
| import numpy as np |
| |
| from tensorflow.python.framework import errors |
| from tensorflow.python.framework import ops |
| from tensorflow.python.ops import array_ops |
| from tensorflow.python.ops import gradients |
| from tensorflow.python.ops import nccl_ops |
| from tensorflow.python.platform import test |
| |
| |
| def _DeviceTensors(tensors, devices): |
| res = [] |
| for t, d in zip(tensors, devices): |
| with ops.device(d): |
| res.append(array_ops.identity(t)) |
| return res |
| |
| |
| def _NcclAllReduce(nccl_fun, tensors, devices): |
| return nccl_fun(_DeviceTensors(tensors, devices)) |
| |
| |
| def _NcclReduce(nccl_fun, tensors, devices): |
| receiver = np.random.randint(0, len(devices)) |
| with ops.device(devices[receiver]): |
| return [nccl_fun(_DeviceTensors(tensors, devices))] |
| |
| |
| def _NcclBroadcast(tensors, devices): |
| sender = np.random.randint(0, len(devices)) |
| with ops.device(devices[sender]): |
| tensor = array_ops.identity(tensors[0]) |
| broadcast = nccl_ops.broadcast(tensor) |
| return _DeviceTensors([broadcast] * len(devices), devices) |
| |
| |
| class NcclTestCase(test.TestCase): |
| |
| def _Test(self, |
| nccl_reduce, |
| numpy_fn, |
| device_sets=(['/device:GPU:1', '/device:GPU:2', '/device:GPU:0'], |
| ['/device:GPU:1', '/device:GPU:0'])): |
| """Tests that nccl_reduce does the same as reduction with numpy_fn. |
| |
| Args: |
| nccl_reduce: A function taking a list of tensors and a list of devices, |
| and returns a list of reduced tensors and a list of ops to perform the |
| reduction. |
| numpy_fn: A function taking two tensors and returning the reduction of the |
| two. |
| device_sets: Tuple of virtual devices to run test on. |
| """ |
| for dtype in [np.float16, np.float32, np.int32, np.int64, np.float64]: |
| # Create session inside outer loop to test use of |
| # same communicator across multiple sessions. |
| with self.test_session(): |
| |
| for devices in device_sets: |
| shape = (3, 4) |
| random = (np.random.random_sample(shape) - .5) * 1024 |
| tensors = [] |
| for _ in devices: |
| tensors.append(random.astype(dtype)) |
| np_ans = tensors[0] |
| for t in tensors[1:]: |
| np_ans = numpy_fn(np_ans, t) |
| |
| reduce_tensors = nccl_reduce(tensors, devices) |
| self.assertNotEmpty(reduce_tensors) |
| |
| # Test shape inference. |
| for r in reduce_tensors: |
| self.assertEqual(shape, r.get_shape()) |
| |
| result_tensors = [array_ops.identity(t) for t in reduce_tensors] |
| |
| # Check GPU availability *after* creating session, see b/68975239. |
| if not test.is_gpu_available(): |
| # If no GPU is available, only test graph construction. |
| continue |
| |
| # Test execution and results. |
| for t in self.evaluate(result_tensors): |
| self.assertAllClose(t, np_ans) |
| |
| def _TestGradient(self, nccl_reduce, numpy_fn): |
| """Tests the gradient of nccl_reduce. |
| |
| Args: |
| nccl_reduce: A function taking a list of tensors and a list of devices, |
| and returns a list of reduced tensors and a list of ops to perform the |
| reduction. |
| numpy_fn: A function taking two tensors and returning the gradient of the |
| reduction of the two. |
| """ |
| |
| def _Gradient(tensors, devices): |
| inputs = [array_ops.placeholder(t.dtype, t.shape) for t in tensors] |
| reduce_tensors = nccl_reduce(inputs, devices) |
| losses = _DeviceTensors(tensors, [t.device for t in reduce_tensors]) |
| grads = gradients.gradients( |
| reduce_tensors, inputs, losses, colocate_gradients_with_ops=True) |
| return [g for g in grads if g is not None] |
| |
| self._Test(_Gradient, numpy_fn) |
| |
| |
| class AllReduceTest(NcclTestCase): |
| |
| def testAllReduce(self): |
| self._Test(partial(_NcclAllReduce, nccl_ops.all_sum), lambda x, y: x + y) |
| self._Test(partial(_NcclAllReduce, nccl_ops.all_prod), lambda x, y: x * y) |
| self._Test(partial(_NcclAllReduce, nccl_ops.all_min), np.minimum) |
| self._Test(partial(_NcclAllReduce, nccl_ops.all_max), np.maximum) |
| |
| def testAllSumGrad(self): |
| self._TestGradient( |
| partial(_NcclAllReduce, nccl_ops.all_sum), lambda x, y: x + y) |
| |
| def testErrors(self): |
| with self.assertRaisesRegex(ValueError, 'Device assignment .* required'): |
| nccl_ops.all_sum([array_ops.identity(np.random.random_sample((3, 4)))]) |
| with self.assertRaisesRegex(ValueError, 'Must pass >0 tensors'): |
| nccl_ops.all_sum([]) |
| |
| |
| class SingleReduceTest(NcclTestCase): |
| |
| def testSum(self): |
| self._Test(partial(_NcclReduce, nccl_ops.reduce_sum), lambda x, y: x + y) |
| |
| def testSumGrad(self): |
| self._TestGradient(partial(_NcclReduce, nccl_ops.reduce_sum), |
| lambda x, y: x) |
| |
| |
| class BroadcastTest(NcclTestCase): |
| |
| def testBroadcast(self): |
| self._Test(_NcclBroadcast, lambda x, y: x) |
| |
| def testBroadcastSingleDevice(self): |
| # Broadcasts on a single device are removed completely during rewrite. |
| self._Test(_NcclBroadcast, lambda x, y: x, |
| (['/device:GPU:0', '/device:GPU:0'],)) |
| |
| def testBroadcastToCpuError(self): |
| try: |
| # Broadcasts to CPU is not supported. |
| self._Test(_NcclBroadcast, lambda x, y: x, |
| (['/device:GPU:0', '/device:CPU:0'],)) |
| except errors.NotFoundError as e: |
| self.assertRegex( |
| str(e), "No registered '_NcclBroadcastRecv' OpKernel for CPU devices") |
| else: |
| # Session isn't executed when no GPU is available. |
| if test.is_gpu_available(): |
| self.fail("Didn't raise NotFoundError trying to broadcast to CPU") |
| |
| |
| class CombinedTest(NcclTestCase): |
| """Test all-reduce vs. single-reduce plus broadcast in one session.run.""" |
| |
| def _Combined(self, tensors, devices): |
| all_reduce_tensors = _NcclAllReduce(nccl_ops.all_sum, tensors, devices) |
| single_reduce_tensors = _NcclReduce(nccl_ops.reduce_sum, tensors, devices) |
| broadcast_tensors = _NcclBroadcast(single_reduce_tensors, devices) |
| return all_reduce_tensors + broadcast_tensors |
| |
| def testCombined(self): |
| self._Test(self._Combined, lambda x, y: x + y) |
| |
| |
| if __name__ == '__main__': |
| test.main() |