blob: 88f0101a94820166d85c4b5a4bdda008f928bd22 [file] [log] [blame]
# coding=utf-8
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Reader for the simple log format (non-protobuf).
The "simple" log format consists of a sequence of one-line json strings and
raw dump of tensor data - with the assumption that the endianness of the
reader is the same as the endianness of the producer - as follows:
header: this is a json object describing the feature tensors and the outcome
tensor. The feature tensors are in an array - the order matters (as will become
apparent).
Example: {"features": [{tensor spec}, {tensor spec}], "score": {tensor spec}}
The tensor spec is a json object:
{"name":.., "port":... "shape":[..], "type":".."}
context: this is a json object indicating the context the observations that
follow refer to. Example: for inlining, the context is "default" (the module).
For regalloc, the context is a function name.
Example: {"context": "_ZNfoobar"}
observation: this is a json object indicating that an observation - i.e. feature
data - is following. It also contains the observation count (0, 1...)
Example: {"observation": 0}
A buffer containing the dump of tensor data, in the order given in the header,
follows here. A new line terminates it - just so that the next json string
appears at the beginning of the line, in case the log is opened with an editor
(so just for debugging). The reader should use the header data to know how much
data to read, and to which tensors it corresponds. The reader may not rely on
the terminating \n as indicator, and should just assume it there and consume it
upon finishing reading the tensor data.
outcome: this is a json object indicating the score/reward data follows. It also
has an id which should match that of the observation before (for debugging)
Example: {"outcome": 0}
A buffer containing the outcome tensor follows - same idea as for features.
The above repeats - either a new observation follows, or a new context.
Refer to (in llvm repo) to llvm/lib/Analysis/models/log_reader.py
which is used there for testing.
"""
import collections
import ctypes
import dataclasses
import json
import math
from typing import Any, BinaryIO, Dict, Generator, List, Optional, Union
import numpy as np
import tensorflow as tf
_element_type_name_map = {
'float': (ctypes.c_float, tf.float32),
'double': (ctypes.c_double, tf.float64),
'int8_t': (ctypes.c_int8, tf.int8),
'uint8_t': (ctypes.c_uint8, tf.uint8),
'int16_t': (ctypes.c_int16, tf.int16),
'uint16_t': (ctypes.c_uint16, tf.uint16),
'int32_t': (ctypes.c_int32, tf.int32),
'uint32_t': (ctypes.c_uint32, tf.uint32),
'int64_t': (ctypes.c_int64, tf.int64),
'uint64_t': (ctypes.c_uint64, tf.uint64)
}
_element_type_name_to_dtype = {
name: dtype for name, (_, dtype) in _element_type_name_map.items()
}
_dtype_to_ctype = {
dtype: ctype for _, (ctype, dtype) in _element_type_name_map.items()
}
def convert_dtype_to_ctype(dtype: str) -> Union[type, tf.dtypes.DType]:
"""Public interface for the _dtype_to_ctype dict."""
return _dtype_to_ctype[dtype]
def create_tensorspec(d: Dict[str, Any]) -> tf.TensorSpec:
name: str = d['name']
shape: List[int] = [int(e) for e in d['shape']]
element_type_str: str = d['type']
if element_type_str not in _element_type_name_to_dtype:
raise ValueError(f'uknown type: {element_type_str}')
return tf.TensorSpec(
name=name,
shape=tf.TensorShape(shape),
dtype=_element_type_name_to_dtype[element_type_str])
class LogReaderTensorValue:
"""The value of a tensor of a given spec.
We root the bytes buffer which provide the underlying data, and index in
the value based on the type of the tensor, thus the TensorValue can be used
as a list-like object containing the scalar values, in row-major order, of
the tensor.
Endianness is assumed to be the same as the log producer's.
"""
__slots__ = ('_buffer', '_spec', '_len', '_view')
def __init__(self, spec: tf.TensorSpec, buffer: bytes):
self._buffer = buffer
self._spec = spec
self._len = math.prod(spec.shape)
self._set_view()
@property
def spec(self):
return self._spec
def to_numpy(self) -> np.ndarray:
return np.frombuffer(
self._buffer,
dtype=convert_dtype_to_ctype(self._spec.dtype),
count=self._len)
def _set_view(self):
# c_char_p is a nul-terminated string, so the more appropriate cast here
# would be POINTER(c_char), but unfortunately, c_char_p is the only
# type that can be constructed from a `bytes`. To capture our intent,
# we cast the c_char_p to
buffer_as_nul_ending_ptr = ctypes.c_char_p(self._buffer)
buffer_as_naked_ptr = ctypes.cast(buffer_as_nul_ending_ptr,
ctypes.POINTER(ctypes.c_char))
self._view = ctypes.cast(buffer_as_naked_ptr,
ctypes.POINTER(_dtype_to_ctype[self.spec.dtype]))
def __len__(self) -> int:
return self._len
def __getitem__(self, index: int):
if index < 0 or index >= self._len:
raise IndexError(f'Index {index} out of range [0..{self._len})')
# pytype believes `index` is an object, despite all the annotations to the
# contrary.
return self._view[index] # pytype:disable=wrong-arg-types
@dataclasses.dataclass(frozen=True)
class _Header:
features: List[tf.TensorSpec]
score: Optional[tf.TensorSpec]
def _read_tensor(fs: BinaryIO, ts: tf.TensorSpec) -> LogReaderTensorValue:
size = math.prod(ts.shape) * ctypes.sizeof(_dtype_to_ctype[ts.dtype])
data = fs.read(size)
return LogReaderTensorValue(ts, data)
def _read_header(f: BinaryIO) -> Optional[_Header]:
header_raw = f.readline()
if not header_raw:
# This is the path taken by empty files
return None
header = json.loads(header_raw)
tensor_specs = [create_tensorspec(ts) for ts in header['features']]
score_spec = create_tensorspec(header['score']) if 'score' in header else None
return _Header(features=tensor_specs, score=score_spec)
@dataclasses.dataclass(frozen=True)
class ObservationRecord:
context: str
observation_id: int
feature_values: List[LogReaderTensorValue]
score: Optional[LogReaderTensorValue]
def _enumerate_log_from_stream(
f: BinaryIO, header: _Header) -> Generator[ObservationRecord, None, None]:
"""A generator that returns Record objects from a log file.
It is assumed the log file's header was read separately.
"""
tensor_specs = header.features
score_spec = header.score
context = None
while event_str := f.readline():
event = json.loads(event_str)
if 'context' in event:
context = event['context']
continue
observation_id = int(event['observation'])
features = []
for ts in tensor_specs:
features.append(_read_tensor(f, ts))
f.readline()
score = None
if score_spec is not None:
score_header = json.loads(f.readline())
assert int(score_header['outcome']) == observation_id
score = _read_tensor(f, score_spec)
f.readline()
yield ObservationRecord(
context=context,
observation_id=observation_id,
feature_values=features,
score=score)
def read_log_from_file(f) -> Generator[ObservationRecord, None, None]:
header = _read_header(f)
if header:
yield from _enumerate_log_from_stream(f, header)
def read_log(fname: str) -> Generator[ObservationRecord, None, None]:
with open(fname, 'rb') as f:
yield from read_log_from_file(f)
def _add_feature(se: tf.train.SequenceExample, spec: tf.TensorSpec,
value: LogReaderTensorValue):
f = se.feature_lists.feature_list[spec.name].feature.add()
# This should never happen: _add_feature is an implementation detail of
# read_log_as_sequence_examples, and the only dtypes we should see here are
# those in _element_type_name_map, or an exception would have been thrown
# already.
if spec.dtype not in _dtype_to_ctype:
raise ValueError('Unsupported dtype: f{spec.dtype}')
if spec.dtype in [tf.float32, tf.float64]:
lst = f.float_list.value
else:
lst = f.int64_list.value
lst.extend(value)
def read_log_as_sequence_examples(
fname: str) -> Dict[str, tf.train.SequenceExample]:
ret: Dict[str, tf.train.SequenceExample] = collections.defaultdict(
tf.train.SequenceExample)
# a record is an observation: the features and score for one step.
# the records are in time order
# the `context` is, for example, the function name for passes like regalloc.
# we produce a dictionary keyed in contexts with SequenceExample values.
for record in read_log(fname):
se = ret[record.context]
if record.score:
_add_feature(se, record.score.spec, record.score)
for t in record.feature_values:
_add_feature(se, t.spec, t)
return ret