blob: 938277324f1de6e38be82517d6b73b91c6c2829d [file] [log] [blame]
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for tensorflow.ops.tf.scatter_nd."""
import functools
import numpy as np
from tensorflow.compiler.tests import xla_test
from tensorflow.python.framework import errors
from tensorflow.python.framework import test_util
from tensorflow.python.ops import array_ops
from tensorflow.python.platform import test
def _AsType(v, vtype):
return v.astype(vtype) if isinstance(v, np.ndarray) else vtype(v)
def _FlatInnerDims(tensor, ndims=2):
shape = list(tensor.shape)
return tensor.reshape(
[functools.reduce(lambda x, y: x * y, shape[:-ndims + 1], 1)] +
shape[-ndims + 1:])
def _FlatOuterDims(tensor, ndims=2):
shape = list(tensor.shape)
return tensor.reshape(
shape[:ndims - 1] +
[functools.reduce(lambda x, y: x * y, shape[ndims - 1:], 1)])
def _NumpyScatterNd(ref, indices, updates, op):
ixdim = indices.shape[-1]
num_updates = indices.size // ixdim
total_nd = len(ref.shape)
slice_size = 1
for i in range(ixdim, total_nd):
slice_size *= ref.shape[i]
flat_indices = _FlatInnerDims(indices)
flat_updates = updates.reshape((num_updates, slice_size))
output_flat = _FlatOuterDims(ref, ixdim + 1)
for ix_updates, ix_output in enumerate(flat_indices):
ix_output = tuple(ix_output)
output_flat[ix_output] = op(output_flat[ix_output],
flat_updates[ix_updates])
return output_flat.reshape(ref.shape)
def _NumpyUpdate(indices, updates, shape):
ref = np.zeros(shape, dtype=updates.dtype)
return _NumpyScatterNd(ref, indices, updates, lambda p, u: u)
class ScatterNdTest(xla_test.XLATestCase):
def _VariableRankTest(self,
np_scatter,
tf_scatter,
vtype,
itype,
repeat_indices=False):
np.random.seed(8)
ref_shapes = [(3, 6), (3, 6), (3, 6, 9), (3, 6, 9), (3, 6, 9), (3, 6, 9)]
indices_shapes = [(2,), (2, 2), (2,), (2, 2), (2, 3), (2, 3, 3)]
for ref_shape, indices_shape in zip(ref_shapes, indices_shapes):
num_updates = indices_shape[0]
ixdim = indices_shape[-1]
indexable_area_shape = ()
for i in range(ixdim):
indexable_area_shape += (ref_shape[i],)
all_indices = [
list(coord)
for coord, _ in np.ndenumerate(np.empty(indexable_area_shape, vtype))
]
np.random.shuffle(all_indices)
indices = np.array(all_indices[:num_updates])
if num_updates > 1 and repeat_indices:
indices = indices[:num_updates // 2]
for _ in range(num_updates - num_updates // 2):
indices = np.append(
indices, [indices[np.random.randint(num_updates // 2)]], axis=0)
np.random.shuffle(indices)
indices = _AsType(indices[:num_updates], itype)
updates_shape = (num_updates,)
for i in range(ixdim, len(ref_shape)):
updates_shape += (ref_shape[i],)
updates = _AsType(np.random.randn(*(updates_shape)), vtype)
# Scatter via numpy
np_out = np_scatter(indices, updates, ref_shape)
# Scatter via tensorflow
tf_out = tf_scatter(indices, updates, ref_shape)
self.assertAllClose(np_out, tf_out)
def _VariableRankTests(self, np_scatter, tf_scatter):
for vtype in self.numeric_types:
for itype in set([np.int32, np.int64]).intersection(set(self.int_types)):
self._VariableRankTest(np_scatter, tf_scatter, vtype, itype)
def _runScatterNd(self, indices, updates, shape):
with self.session():
updates_placeholder = array_ops.placeholder(updates.dtype)
indices_placeholder = array_ops.placeholder(indices.dtype)
with self.test_scope():
output = array_ops.scatter_nd(indices_placeholder, updates_placeholder,
shape)
feed_dict = {updates_placeholder: updates, indices_placeholder: indices}
return output.eval(feed_dict=feed_dict)
def testSimple(self):
indices = np.array([[4], [3], [1], [7]], dtype=np.int32)
updates = np.array([9, 10, 11, 12], dtype=np.float32)
expected = np.array([0, 11, 0, 10, 9, 0, 0, 12], dtype=np.int32)
self.assertAllEqual(expected, self._runScatterNd(indices, updates, [8]))
def testRepeatedIndices(self):
indices = np.array([[0], [1], [0], [1]], dtype=np.int32)
updates = np.array([9, 10, 11, 12], dtype=np.float32)
expected = np.array([20, 22], dtype=np.int32)
self.assertAllEqual(expected, self._runScatterNd(indices, updates, [2]))
def testSimple2(self):
indices = np.array([[1, 0], [1, 1]], dtype=np.int32)
updates = np.array([11., 12.], dtype=np.float32)
expected = np.array([[0., 0.], [11., 12.], [0., 0.]], dtype=np.float32)
self.assertAllEqual(expected, self._runScatterNd(indices, updates, [3, 2]))
def testSimple3(self):
indices = np.array([[1]], dtype=np.int32)
updates = np.array([[11., 12.]], dtype=np.float32)
expected = np.array([[0., 0.], [11., 12.], [0., 0.]])
self.assertAllEqual(expected, self._runScatterNd(indices, updates, [3, 2]))
def testVariableRankUpdate(self):
self._VariableRankTests(_NumpyUpdate, self._runScatterNd)
def testExtraIndicesDimensions(self):
indices = np.zeros([1, 1, 2], np.int32)
updates = np.zeros([1, 1], np.int32)
expected = np.zeros([2, 2], dtype=np.int32)
self.assertAllEqual(expected, self._runScatterNd(indices, updates, [2, 2]))
@test_util.disable_mlir_bridge("Error messages differ")
def testRank3InvalidShape1(self):
indices = np.zeros([3, 2, 2], np.int32)
updates = np.zeros([2, 2, 2], np.int32)
with self.assertRaisesWithPredicateMatch(errors.InvalidArgumentError,
"Must have updates.shape"):
self._runScatterNd(indices, updates, [2, 2, 2])
@test_util.disable_mlir_bridge("Error messages differ")
def testRank3InvalidShape2(self):
indices = np.zeros([2, 2, 1], np.int32)
updates = np.zeros([2, 2], np.int32)
with self.assertRaisesWithPredicateMatch(errors.InvalidArgumentError,
"Must have updates.shape"):
self._runScatterNd(indices, updates, [2, 2, 2])
def testScatterOutOfRange(self):
updates = np.array([-3, -4, -5]).astype(np.float32)
# Indices all in range, no problem.
indices = np.array([[2], [0], [5]], dtype=np.int32)
self._runScatterNd(indices, updates, [6])
# Indices out of range should not fail. It produces implementation-defined
# output.
indices = np.array([[-1], [0], [5]], dtype=np.int32)
self._runScatterNd(indices, updates, [6])
indices = np.array([[2], [0], [6]], dtype=np.int32)
self._runScatterNd(indices, updates, [6])
class ScatterNdTensorTest(xla_test.XLATestCase):
def _runScatter(self, op):
indices_np = np.array([[4], [3], [1], [7]], dtype=np.int32)
updates_np = np.array([9, 10, 11, 12], dtype=np.float32)
with self.session() as sess, self.test_scope():
indices = array_ops.placeholder(indices_np.dtype, shape=indices_np.shape)
updates = array_ops.placeholder(updates_np.dtype, shape=updates_np.shape)
t = array_ops.ones([8], dtype=np.float32)
out = op(t, indices, updates)
return sess.run(out, feed_dict={indices: indices_np, updates: updates_np})
def testAdd(self):
self.assertAllEqual(
self._runScatter(array_ops.tensor_scatter_add),
np.array([1, 12, 1, 11, 10, 1, 1, 13], dtype=np.float32))
def testSub(self):
self.assertAllEqual(
self._runScatter(array_ops.tensor_scatter_sub),
np.array([1, -10, 1, -9, -8, 1, 1, -11], dtype=np.float32))
def testUpdate(self):
self.assertAllEqual(
self._runScatter(array_ops.tensor_scatter_update),
np.array([1, 11, 1, 10, 9, 1, 1, 12], dtype=np.float32))
class ScatterNdTensorScalarUpdateTest(xla_test.XLATestCase):
def _runScatter(self, op):
indices_np = np.array([[4], [3], [1], [7]], dtype=np.int32)
updates_np = np.array(9, dtype=np.float32)
with self.session() as sess, self.test_scope():
indices = array_ops.placeholder(indices_np.dtype, shape=indices_np.shape)
updates = array_ops.placeholder(updates_np.dtype, shape=updates_np.shape)
t = array_ops.ones([8], dtype=np.float32)
out = op(t, indices, updates)
return sess.run(out, feed_dict={indices: indices_np, updates: updates_np})
def testUpdate(self):
self.assertAllEqual(
self._runScatter(array_ops.tensor_scatter_update),
np.array([1, 9, 1, 9, 9, 1, 1, 9], dtype=np.float32))
def testAdd(self):
self.assertAllEqual(
self._runScatter(array_ops.tensor_scatter_add),
np.array([1, 10, 1, 10, 10, 1, 1, 10], dtype=np.float32))
if __name__ == "__main__":
test.main()