blob: bd84f0048b28189a522f14675760f7185c9b8003 [file] [log] [blame]
# Copyright 2022 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for TPU Embeddings mid level API on CPU."""
import numpy as np
from tensorflow.python.compat import v2_compat
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import sparse_tensor
from tensorflow.python.ops import init_ops_v2
from tensorflow.python.ops.ragged import ragged_tensor
from tensorflow.python.platform import test
from tensorflow.python.tpu import tpu_embedding_for_serving
from tensorflow.python.tpu import tpu_embedding_v2_utils
from tensorflow.python.util import nest
class TPUEmbeddingForServingTest(test.TestCase):
def setUp(self):
super(TPUEmbeddingForServingTest, self).setUp()
self.embedding_values = np.array(list(range(32)), dtype=np.float64)
self.initializer = init_ops_v2.Constant(self.embedding_values)
# Embedding for video initialized to
# 0 1 2 3
# 4 5 6 7
# ...
self.table_video = tpu_embedding_v2_utils.TableConfig(
vocabulary_size=8,
dim=4,
initializer=self.initializer,
combiner='sum',
name='video')
# Embedding for user initialized to
# 0 1
# 2 3
# 4 5
# 6 7
# ...
self.table_user = tpu_embedding_v2_utils.TableConfig(
vocabulary_size=16,
dim=2,
initializer=self.initializer,
combiner='mean',
name='user')
self.feature_config = (
tpu_embedding_v2_utils.FeatureConfig(
table=self.table_video, name='watched'),
tpu_embedding_v2_utils.FeatureConfig(
table=self.table_video, name='favorited'),
tpu_embedding_v2_utils.FeatureConfig(
table=self.table_user, name='friends'))
self.batch_size = 2
self.data_batch_size = 4
# One (global) batch of inputs
# sparse tensor for watched:
# row 0: 0
# row 1: 0, 1
# row 2: 0, 1
# row 3: 1
self.feature_watched_indices = [[0, 0], [1, 0], [1, 1],
[2, 0], [2, 1], [3, 0]]
self.feature_watched_values = [0, 0, 1, 0, 1, 1]
self.feature_watched_row_lengths = [1, 2, 2, 1]
# sparse tensor for favorited:
# row 0: 0, 1
# row 1: 1
# row 2: 0
# row 3: 0, 1
self.feature_favorited_indices = [[0, 0], [0, 1], [1, 0],
[2, 0], [3, 0], [3, 1]]
self.feature_favorited_values = [0, 1, 1, 0, 0, 1]
self.feature_favorited_row_lengths = [2, 1, 1, 2]
# sparse tensor for friends:
# row 0: 3
# row 1: 0, 1, 2
# row 2: 3
# row 3: 0, 1, 2
self.feature_friends_indices = [[0, 0], [1, 0], [1, 1], [1, 2],
[2, 0], [3, 0], [3, 1], [3, 2]]
self.feature_friends_values = [3, 0, 1, 2, 3, 0, 1, 2]
self.feature_friends_row_lengths = [1, 3, 1, 3]
def _create_mid_level(self):
optimizer = tpu_embedding_v2_utils.SGD(learning_rate=0.1)
return tpu_embedding_for_serving.TPUEmbeddingForServing(
feature_config=self.feature_config, optimizer=optimizer)
def _get_dense_tensors(self, dtype=dtypes.int32):
feature0 = constant_op.constant(self.feature_watched_values, dtype=dtype)
feature1 = constant_op.constant(self.feature_favorited_values, dtype=dtype)
feature2 = constant_op.constant(self.feature_friends_values, dtype=dtype)
return (feature0, feature1, feature2)
def test_cpu_dense_lookup(self):
mid_level = self._create_mid_level()
features = self._get_dense_tensors()
results = mid_level(features, weights=None)
all_lookups = []
for feature, config in zip(nest.flatten(features), self.feature_config):
table = mid_level.embedding_tables[config.table].numpy()
all_lookups.append(table[feature.numpy()])
self.assertAllClose(results, nest.pack_sequence_as(results, all_lookups))
def test_cpu_dense_lookup_with_weights(self):
mid_level = self._create_mid_level()
features = self._get_dense_tensors()
weights = self._get_dense_tensors(dtype=dtypes.float32)
with self.assertRaisesRegex(
ValueError, 'Weight specified for .*, but input is dense.'):
mid_level(features, weights=weights)
def _get_sparse_tensors(self, dtype=dtypes.int32):
feature0 = sparse_tensor.SparseTensor(
indices=self.feature_watched_indices,
values=constant_op.constant(self.feature_watched_values, dtype=dtype),
dense_shape=[self.data_batch_size, 2])
feature1 = sparse_tensor.SparseTensor(
indices=self.feature_favorited_indices,
values=constant_op.constant(self.feature_favorited_values, dtype=dtype),
dense_shape=[self.data_batch_size, 2])
feature2 = sparse_tensor.SparseTensor(
indices=self.feature_friends_indices,
values=constant_op.constant(self.feature_friends_values, dtype=dtype),
dense_shape=[self.data_batch_size, 3])
return (feature0, feature1, feature2)
def test_cpu_sparse_lookup(self):
mid_level = self._create_mid_level()
features = self._get_sparse_tensors()
results = mid_level(features, weights=None)
reduced = []
for feature, config in zip(nest.flatten(features), self.feature_config):
table = mid_level.embedding_tables[config.table].numpy()
all_lookups = table[feature.values.numpy()]
# With row starts we can use reduceat in numpy. Get row starts from the
# ragged tensor API.
ragged = ragged_tensor.RaggedTensor.from_sparse(feature)
row_starts = ragged.row_starts().numpy()
reduced.append(np.add.reduceat(all_lookups, row_starts))
if config.table.combiner == 'mean':
# for mean, divide by the row lengths.
reduced[-1] /= np.expand_dims(ragged.row_lengths().numpy(), axis=1)
self.assertAllClose(results, nest.pack_sequence_as(results, reduced))
def test_cpu_sparse_lookup_with_weights(self):
mid_level = self._create_mid_level()
features = self._get_sparse_tensors()
weights = self._get_sparse_tensors(dtype=dtypes.float32)
results = mid_level(features, weights=weights)
weighted_sum = []
for feature, weight, config in zip(nest.flatten(features),
nest.flatten(weights),
self.feature_config):
table = mid_level.embedding_tables[config.table].numpy()
# Expand dims here needed to broadcast this multiplication properly.
weight = np.expand_dims(weight.values.numpy(), axis=1)
all_lookups = table[feature.values.numpy()] * weight
# With row starts we can use reduceat in numpy. Get row starts from the
# ragged tensor API.
row_starts = ragged_tensor.RaggedTensor.from_sparse(feature).row_starts()
row_starts = row_starts.numpy()
weighted_sum.append(np.add.reduceat(all_lookups, row_starts))
if config.table.combiner == 'mean':
weighted_sum[-1] /= np.add.reduceat(weight, row_starts)
self.assertAllClose(results, nest.pack_sequence_as(results,
weighted_sum))
def test_cpu_sparse_lookup_with_non_sparse_weights(self):
mid_level = self._create_mid_level()
features = self._get_sparse_tensors()
weights = self._get_dense_tensors(dtype=dtypes.float32)
with self.assertRaisesRegex(
ValueError, 'but it does not match type of the input which is'):
mid_level(features, weights=weights)
def _get_ragged_tensors(self, dtype=dtypes.int32):
feature0 = ragged_tensor.RaggedTensor.from_row_lengths(
values=constant_op.constant(self.feature_watched_values, dtype=dtype),
row_lengths=self.feature_watched_row_lengths)
feature1 = ragged_tensor.RaggedTensor.from_row_lengths(
values=constant_op.constant(self.feature_favorited_values, dtype=dtype),
row_lengths=self.feature_favorited_row_lengths)
feature2 = ragged_tensor.RaggedTensor.from_row_lengths(
values=constant_op.constant(self.feature_friends_values, dtype=dtype),
row_lengths=self.feature_friends_row_lengths)
return (feature0, feature1, feature2)
def test_cpu_ragged_lookup_with_weights(self):
mid_level = self._create_mid_level()
features = self._get_ragged_tensors()
weights = self._get_ragged_tensors(dtype=dtypes.float32)
results = mid_level(features, weights=weights)
weighted_sum = []
for feature, weight, config in zip(nest.flatten(features),
nest.flatten(weights),
self.feature_config):
table = mid_level.embedding_tables[config.table].numpy()
# Expand dims here needed to broadcast this multiplication properly.
weight = np.expand_dims(weight.values.numpy(), axis=1)
all_lookups = table[feature.values.numpy()] * weight
row_starts = feature.row_starts().numpy()
weighted_sum.append(np.add.reduceat(all_lookups, row_starts))
if config.table.combiner == 'mean':
weighted_sum[-1] /= np.add.reduceat(weight, row_starts)
self.assertAllClose(results, nest.pack_sequence_as(results,
weighted_sum))
def test_cpu_invalid_structure_for_features(self):
mid_level = self._create_mid_level()
# Remove one element of the tuple, self.feature_config has 3 so we need to
# pass 3.
features = tuple(self._get_sparse_tensors()[:2])
with self.assertRaises(ValueError):
mid_level(features, weights=None)
def test_cpu_invalid_structure_for_weights(self):
mid_level = self._create_mid_level()
features = self._get_sparse_tensors()
# Remove one element of the tuple, self.feature_config has 3 so we need to
# pass 3 (or None).
weights = tuple(self._get_dense_tensors(dtype=dtypes.float32)[:2])
with self.assertRaises(ValueError):
mid_level(features, weights=weights)
def _numpy_sequence_lookup(self, table, indices, values, batch_size,
max_sequence_length, dim):
# First we truncate to max_sequence_length.
valid_entries = np.nonzero(indices[:, 1] < max_sequence_length)[0]
indices = indices[valid_entries]
values = values[valid_entries]
# Then we gather the values
lookup = table[values]
# Then we scatter them into the result array.
scatter_result = np.zeros([batch_size, max_sequence_length, dim])
for i, index in enumerate(indices):
scatter_result[index[0], index[1], :] = lookup[i]
return scatter_result
def test_cpu_sequence_lookup_sparse(self):
feature_config = (
tpu_embedding_v2_utils.FeatureConfig(
table=self.table_user, name='friends', max_sequence_length=2),)
optimizer = tpu_embedding_v2_utils.SGD(learning_rate=0.1)
mid_level = tpu_embedding_for_serving.TPUEmbeddingForServing(
feature_config=feature_config, optimizer=optimizer)
features = self._get_sparse_tensors()[2:3]
result = mid_level(features, weights=None)
golden = self._numpy_sequence_lookup(
mid_level.embedding_tables[self.table_user].numpy(),
features[0].indices.numpy(),
features[0].values.numpy(),
self.data_batch_size,
feature_config[0].max_sequence_length,
self.table_user.dim)
self.assertAllClose(result[0], golden)
def test_cpu_sequence_lookup_ragged(self):
feature_config = (
tpu_embedding_v2_utils.FeatureConfig(
table=self.table_user, name='friends', max_sequence_length=2),)
optimizer = tpu_embedding_v2_utils.SGD(learning_rate=0.1)
mid_level = tpu_embedding_for_serving.TPUEmbeddingForServing(
feature_config=feature_config, optimizer=optimizer)
features = self._get_ragged_tensors()[2:3]
result = mid_level(features, weights=None)
sparse_ver = features[0].to_sparse()
golden = self._numpy_sequence_lookup(
mid_level.embedding_tables[self.table_user].numpy(),
sparse_ver.indices.numpy(),
sparse_ver.values.numpy(),
self.data_batch_size,
feature_config[0].max_sequence_length,
self.table_user.dim)
self.assertAllClose(result[0], golden)
def test_cpu_high_dimensional_lookup_ragged(self):
feature_config = (tpu_embedding_v2_utils.FeatureConfig(
table=self.table_user, name='friends', output_shape=[2, 2]),)
optimizer = tpu_embedding_v2_utils.SGD(learning_rate=0.1)
mid_level = tpu_embedding_for_serving.TPUEmbeddingForServing(
feature_config=feature_config, optimizer=optimizer)
features = self._get_ragged_tensors()[2:3]
result = mid_level(features, weights=None)
self.assertAllClose(result[0].shape, (2, 2, 2))
def test_cpu_high_dimensional_sequence_lookup_ragged(self):
# Prod of output shape is a factor of the data batch size.
# The divide result will be the sequence length.
feature_config = (tpu_embedding_v2_utils.FeatureConfig(
table=self.table_user, name='friends', output_shape=[2, 4]),)
optimizer = tpu_embedding_v2_utils.SGD(learning_rate=0.1)
mid_level = tpu_embedding_for_serving.TPUEmbeddingForServing(
feature_config=feature_config, optimizer=optimizer)
features = self._get_ragged_tensors()[2:3]
result = mid_level(features, weights=None)
self.assertAllClose(result[0].shape, (2, 4, 2))
def test_cpu_high_dimensional_invalid_lookup_ragged(self):
# Prod of output shape is not a factor of the data batch size.
# An error will be raised in this case.
feature_config = (tpu_embedding_v2_utils.FeatureConfig(
table=self.table_user, name='friends', output_shape=[3]),)
optimizer = tpu_embedding_v2_utils.SGD(learning_rate=0.1)
mid_level = tpu_embedding_for_serving.TPUEmbeddingForServing(
feature_config=feature_config, optimizer=optimizer)
features = self._get_ragged_tensors()[2:3]
with self.assertRaisesRegex(
ValueError,
'Output shape set in the FeatureConfig should be the factor'):
mid_level(features, weights=None)
def test_cpu_no_optimizer(self):
feature_config = (
tpu_embedding_v2_utils.FeatureConfig(
table=self.table_video, name='watched', max_sequence_length=2),)
mid_level = tpu_embedding_for_serving.TPUEmbeddingForServing(
feature_config=feature_config, optimizer=None)
# Build the layer manually to create the variables. Normally calling enqueue
# would do this.
mid_level.build()
self.assertEqual(
list(mid_level._variables[self.table_video.name].keys()),
['parameters'])
def test_cpu_multiple_creation(self):
feature_config = (tpu_embedding_v2_utils.FeatureConfig(
table=self.table_user, name='friends', max_sequence_length=2),)
optimizer = tpu_embedding_v2_utils.SGD(learning_rate=0.1)
embedding_one = tpu_embedding_for_serving.TPUEmbeddingForServing(
feature_config=feature_config, optimizer=optimizer)
embedding_two = tpu_embedding_for_serving.TPUEmbeddingForServing(
feature_config=feature_config, optimizer=optimizer)
# Both of the tpu embedding tables should be able to build on cpu.
embedding_one.build()
embedding_two.build()
if __name__ == '__main__':
v2_compat.enable_v2_behavior()
test.main()