blob: db5791eafbaecc4e635dcac81edc4c3ef9a15045 [file] [log] [blame]
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for compiler_opt.rl.log_reader."""
import ctypes
import enum
import json
from compiler_opt.rl import log_reader
# This is https://github.com/google/pytype/issues/764
from google.protobuf import text_format # pytype: disable=pyi-error
from typing import BinaryIO
import numpy as np
import tensorflow as tf
def json_to_bytes(d) -> bytes:
return json.dumps(d).encode('utf-8')
class LogTestExampleBuilder:
"""Construct a log."""
newline = b'\n'
error_newline = b'hi there'
class ErrorMarkers(enum.IntEnum):
NONE = 0
AFTER_HEADER = enum.auto()
CTX_MARKER_POS = enum.auto()
OBS_MARKER_POS = enum.auto()
OUTCOME_MARKER_POS = enum.auto()
TENSOR_BUF_POS = enum.auto()
TENSORS_POS = enum.auto()
OUTCOME_POS = enum.auto()
def __init__(
self,
*,
opened_file: BinaryIO,
introduce_error_pos: ErrorMarkers = ErrorMarkers.NONE,
):
self._opened_file = opened_file
self._introduce_error_pos = introduce_error_pos
def write_buff(self, buffer: list, ct):
# we should get the ctypes array to bytes for pytype to be happy.
if self._introduce_error_pos == self.ErrorMarkers.TENSOR_BUF_POS:
buffer = buffer[len(buffer) // 2:]
# pytype:disable=wrong-arg-types
self._opened_file.write((ct * len(buffer))(*buffer))
# pytype:enable=wrong-arg-types
def write_newline(self, position=None):
self._opened_file.write(self.error_newline if position ==
self._introduce_error_pos else self.newline)
def write_context_marker(self, name: str):
self._opened_file.write(json_to_bytes({'context': name}))
self.write_newline(self.ErrorMarkers.CTX_MARKER_POS)
def write_observation_marker(self, obs_idx: int):
self._opened_file.write(json_to_bytes({'observation': obs_idx}))
self.write_newline(self.ErrorMarkers.OBS_MARKER_POS)
def write_outcome_marker(self, obs_idx: int):
self._opened_file.write(json_to_bytes({'outcome': obs_idx}))
self.write_newline(self.ErrorMarkers.OUTCOME_MARKER_POS)
def write_header(self, json_header: dict):
self._opened_file.write(json_to_bytes(json_header))
def create_example(fname: str,
*,
nr_contexts=1,
introduce_errors_pos: LogTestExampleBuilder
.ErrorMarkers = LogTestExampleBuilder.ErrorMarkers.NONE):
t0_val = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6]
t1_val = [1, 2, 3]
s = [1.2]
with open(fname, 'wb') as f:
example_writer = LogTestExampleBuilder(
opened_file=f, introduce_error_pos=introduce_errors_pos)
example_writer.write_header({
'features': [{
'name': 'tensor_name2',
'port': 0,
'shape': [2, 3],
'type': 'float',
}, {
'name': 'tensor_name1',
'port': 0,
'shape': [3, 1],
'type': 'int64_t',
}],
'score': {
'name': 'reward',
'port': 0,
'shape': [1],
'type': 'float'
}
})
example_writer.write_newline(
LogTestExampleBuilder.ErrorMarkers.AFTER_HEADER)
for ctx_id in range(nr_contexts):
t0_val = [v + ctx_id * 10 for v in t0_val]
t1_val = [v + ctx_id * 10 for v in t1_val]
example_writer.write_context_marker(f'context_nr_{ctx_id}')
def write_example_obs(obs: int):
example_writer.write_observation_marker(obs)
example_writer.write_buff(t0_val, ctypes.c_float)
example_writer.write_buff(t1_val, ctypes.c_int64)
example_writer.write_newline(
LogTestExampleBuilder.ErrorMarkers.TENSORS_POS)
example_writer.write_outcome_marker(obs)
example_writer.write_buff(s, ctypes.c_float)
example_writer.write_newline(
LogTestExampleBuilder.ErrorMarkers.OUTCOME_POS)
write_example_obs(0)
t0_val = [v + 1 for v in t0_val]
t1_val = [v + 1 for v in t1_val]
s[0] += 1
write_example_obs(1)
class LogReaderTest(tf.test.TestCase):
def test_create_tensorspec(self):
ts = log_reader.create_tensorspec({
'name': 'tensor_name',
'port': 0,
'shape': [2, 3],
'type': 'float'
})
self.assertEqual(
ts, tf.TensorSpec(name='tensor_name', shape=[2, 3], dtype=tf.float32))
def test_read_header(self):
logfile = self.create_tempfile()
create_example(logfile)
with open(logfile, 'rb') as f:
header = log_reader._read_header(f) # pylint: disable=protected-access
self.assertIsNotNone(header)
# Disable attribute error because header is an Optional type, and pytype
# on python 3.9 doesn't recognise that we already checked the Optional is
# not None
# pytype: disable=attribute-error
self.assertEqual(header.features, [
tf.TensorSpec(name='tensor_name2', shape=[2, 3], dtype=tf.float32),
tf.TensorSpec(name='tensor_name1', shape=[3, 1], dtype=tf.int64)
])
self.assertEqual(
header.score,
tf.TensorSpec(name='reward', shape=[1], dtype=tf.float32))
# pytype: enable=attribute-error
def test_read_header_empty_file(self):
logfile = self.create_tempfile()
with open(logfile, 'rb') as f:
header = log_reader._read_header(f) # pylint:disable=protected-access
self.assertIsNone(header)
def test_read_log(self):
logfile = self.create_tempfile()
create_example(logfile)
obs_id = 0
for record in log_reader.read_log(logfile):
self.assertEqual(record.observation_id, obs_id)
self.assertAlmostEqual(record.score[0], 1.2 + obs_id)
obs_id += 1
self.assertEqual(obs_id, 2)
def test_to_numpy(self):
logfile = self.create_tempfile()
create_example(logfile)
t0_val = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6]
t1_val = [1, 2, 3]
for record in log_reader.read_log(logfile):
np.testing.assert_allclose(record.feature_values[0].to_numpy(),
np.array(t0_val))
np.testing.assert_allclose(record.feature_values[1].to_numpy(),
np.array(t1_val))
t0_val = [v + 1 for v in t0_val]
t1_val = [v + 1 for v in t1_val]
def test_seq_example_conversion(self):
logfile = self.create_tempfile()
create_example(logfile, nr_contexts=2)
seq_examples = log_reader.read_log_as_sequence_examples(logfile)
self.assertIn('context_nr_0', seq_examples)
self.assertIn('context_nr_1', seq_examples)
self.assertEqual(
seq_examples['context_nr_1'].feature_lists.feature_list['tensor_name1']
.feature[0].int64_list.value, [12, 13, 14])
# each context has 2 observations. The reward is scalar, the
# 2 features' shapes are given in `create_example` above.
expected_ctx_0 = text_format.Parse(
"""
feature_lists {
feature_list {
key: "reward"
value {
feature {
float_list {
value: 1.2000000476837158
}
}
feature {
float_list {
value: 2.200000047683716
}
}
}
}
feature_list {
key: "tensor_name1"
value {
feature {
int64_list {
value: 1
value: 2
value: 3
}
}
feature {
int64_list {
value: 2
value: 3
value: 4
}
}
}
}
feature_list {
key: "tensor_name2"
value {
feature {
float_list {
value: 0.10000000149011612
value: 0.20000000298023224
value: 0.30000001192092896
value: 0.4000000059604645
value: 0.5
value: 0.6000000238418579
}
}
feature {
float_list {
value: 1.100000023841858
value: 1.2000000476837158
value: 1.2999999523162842
value: 1.399999976158142
value: 1.5
value: 1.600000023841858
}
}
}
}
}
""", tf.train.SequenceExample())
self.assertProtoEquals(expected_ctx_0, seq_examples['context_nr_0'])
def test_errors(self):
logfile = self.create_tempfile()
for error_marker in LogTestExampleBuilder.ErrorMarkers:
if not error_marker:
continue
create_example(logfile, introduce_errors_pos=error_marker)
with self.assertRaises(Exception):
log_reader.read_log_as_sequence_examples(logfile)
def test_truncated_tensors(self):
logfile = self.create_tempfile()
with open(logfile, 'wb') as f:
writer = LogTestExampleBuilder(opened_file=f)
writer.write_header({
'features': [{
'name': 'tensor_name',
'port': 0,
'shape': [2, 3],
'type': 'float',
}],
'score': {
'name': 'reward',
'port': 0,
'shape': [1],
'type': 'float'
}
})
writer.write_newline()
writer.write_context_marker('whatever')
writer.write_observation_marker(0)
writer.write_buff([1], ctypes.c_int16)
with self.assertRaises(Exception):
log_reader.read_log_as_sequence_examples(logfile)
if __name__ == '__main__':
tf.test.main()