blob: 2d47a7ddb258be37107865c7f8104df83bee5e26 [file] [log] [blame]
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========================================================================
"""Tensor Tracer report generation utilities."""
import collections
import hashlib
import os
from tensorflow.python.platform import gfile
from tensorflow.python.platform import tf_logging as logging
from tensorflow.python.tpu import tensor_tracer_pb2
_TRACER_LOG_PREFIX = ' [>>>TT>>>]'
_MARKER_SECTION_BEGIN = '!!!!!!! section-begin:'
_MARKER_SECTION_END = '!!!!!!! section-end:'
_SECTION_NAME_CONFIG = 'configuration'
_SECTION_NAME_REASON = 'reason'
_SECTION_NAME_OP_LIST = 'op-list'
_SECTION_NAME_TENSOR_LIST = 'tensor-list'
_SECTION_NAME_CACHE_INDEX_MAP = 'cache-index-map'
_SECTION_NAME_GRAPH = 'graph'
_SECTION_NAME_TENSOR_TRACER_CHECKPOINT = 'tensor_tracer_checkpoint'
_FIELD_NAME_VERSION = 'version:'
_FIELD_NAME_DEVICE = 'device:'
_FIELD_NAME_TRACE_MODE = 'trace-mode:'
_FIELD_NAME_SUBMODE = 'submode:'
_FIELD_NAME_NUM_REPLICAS = 'num-replicas:'
_FIELD_NAME_NUM_REPLICAS_PER_HOST = 'num-replicas-per-host:'
_FIELD_NAME_NUM_HOSTS = 'num-hosts:'
_FIELD_NAME_NUM_OPS = 'number-of-ops:'
_FIELD_NAME_NUM_TENSORS = 'number-of-tensors:'
_FIELD_NAME_NUM_CACHE_INDICES = 'number-of-indices:'
_FIELD_NAME_TOPOLOGICAL_SORT_SUCCEED = 'topological-sort-succeed:'
_CURRENT_VERSION = 'use-outside-compilation'
_TT_REPORT_PROTO = 'tensor_tracer_report.report_pb'
def topological_sort(g):
"""Performs topological sort on the given graph.
Args:
g: the graph.
Returns:
A pair where the first element indicates if the topological
sort succeeded (True if there is no cycle found; False if a
cycle is found) and the second element is either the sorted
list of nodes or the cycle of nodes found.
"""
def _is_loop_edge(op):
"""Returns true if the op is the end of a while-loop creating a cycle."""
return op.type in ['NextIteration']
def _in_op_degree(op):
"""Returns the number of incoming edges to the given op.
The edge calculation skips the edges that come from 'NextIteration' ops.
NextIteration creates a cycle in the graph. We break cycles by treating
this op as 'sink' and ignoring all outgoing edges from it.
Args:
op: Tf.Operation
Returns:
the number of incoming edges.
"""
count = 0
for op in op.control_inputs + [in_tensor.op for in_tensor in op.inputs]:
if not _is_loop_edge(op):
count += 1
return count
sorted_ops = []
op_in_degree = {op: _in_op_degree(op) for op in g.get_operations()}
frontier = [op for (op, degree) in op_in_degree.items() if degree == 0]
frontier.sort(key=lambda op: op.name)
while frontier:
op = frontier.pop()
# Remove the op from graph, and remove its outgoing edges.
sorted_ops.append(op)
if _is_loop_edge(op):
continue
# pylint: disable=protected-access
consumers = list(op._control_outputs)
# pylint: enable=protected-access
for out_tensor in op.outputs:
consumers += [consumer_op for consumer_op in out_tensor.consumers()]
consumers.sort(key=lambda op: op.name)
for consumer in consumers:
# For each deleted edge shift the bucket of the vertex.
op_in_degree[consumer] -= 1
if op_in_degree[consumer] == 0:
frontier.append(consumer)
if op_in_degree[consumer] < 0:
raise ValueError('consumer:%s degree mismatch'%consumer.name)
left_ops = set(op for (op, degree) in op_in_degree.items() if degree > 0)
if left_ops:
return (True, left_ops)
else:
assert len(g.get_operations()) == len(sorted_ops)
return (False, sorted_ops)
class TensorTracerConfig(object):
"""Tensor Tracer config object."""
def __init__(self):
self.version = _CURRENT_VERSION
self.device_type = None
self.num_replicas = None
self.num_replicas_per_host = None
self.num_hosts = None
class TensorTraceOrder(object):
"""Class that is responsible from storing the trace-id of the tensors."""
def __init__(self, graph_order, traced_tensors):
self.graph_order = graph_order
self.traced_tensors = traced_tensors
self._create_tensor_maps()
def _create_tensor_maps(self):
"""Creates tensor to cache id maps."""
self.tensorname_to_cache_idx = {}
self.cache_idx_to_tensor_idx = []
for out_tensor in self.traced_tensors:
tensor_name = out_tensor.name
if tensor_name in self.tensorname_to_cache_idx:
raise ValueError('Tensor name {} should not be already in '
'tensorname_to_cache_idx'.format(tensor_name))
if tensor_name not in self.graph_order.tensor_to_idx:
raise ValueError(
'Tensor name {} is not in the tensor_to_idx, tensor_to_idx={} '
.format(tensor_name, self.graph_order.tensor_to_idx))
tensor_idx = self.graph_order.tensor_to_idx[tensor_name]
cache_idx = len(self.tensorname_to_cache_idx)
self.tensorname_to_cache_idx[tensor_name] = cache_idx
self.cache_idx_to_tensor_idx.append(tensor_idx)
if len(self.tensorname_to_cache_idx) != len(
self.cache_idx_to_tensor_idx):
raise RuntimeError(
'len(self.tensorname_to_cache_idx) must equal'
'len(self.cache_idx_to_tensor_idx), got '
'len(self.tensorname_to_cache_idx)={}, '
'len(self.cache_idx_to_tensor_idx)={}'
.format(
len(self.tensorname_to_cache_idx),
len(self.cache_idx_to_tensor_idx)))
def sort_tensors_and_ops(graph):
"""Returns a wrapper that has consistent tensor and op orders."""
graph_wrapper = collections.namedtuple('GraphWrapper',
['graph', 'operations', 'op_to_idx',
'tensors', 'tensor_to_idx',
'contains_cycle',
'topological_order_or_cycle'])
contains_cycle, topological_order_or_cycle = topological_sort(graph)
if not contains_cycle:
operations = topological_order_or_cycle
else:
operations = graph.get_operations()
op_to_idx = {op.name: index for index, op
in enumerate(operations)}
tensors = []
for op in operations:
tensors.extend(op.outputs)
tensor_to_idx = {tensor.name: index for index, tensor in
enumerate(tensors)}
return graph_wrapper(graph=graph, operations=operations, op_to_idx=op_to_idx,
tensors=tensors, tensor_to_idx=tensor_to_idx,
contains_cycle=contains_cycle,
topological_order_or_cycle=topological_order_or_cycle)
class OpenReportFile(object):
"""Context manager for writing report file."""
def __init__(self, tt_parameters):
if not tt_parameters.report_file_path:
self._report_file = None
return
try:
self._report_file = gfile.Open(tt_parameters.report_file_path, 'w')
except IOError as e:
raise e
def __enter__(self):
return self._report_file
def __exit__(self, unused_type, unused_value, unused_traceback):
if self._report_file:
self._report_file.close()
def proto_fingerprint(message_proto):
serialized_message = message_proto.SerializeToString()
hasher = hashlib.sha256(serialized_message)
return hasher.hexdigest()
class TTReportHandle(object):
"""Utility class responsible from creating a tensor tracer report."""
def __init__(self):
self.instrument_records = {}
self._report_file = None
def instrument(self, name, explanation):
self.instrument_records[name] = explanation
def instrument_op(self, op, explanation):
self.instrument(op.name, explanation)
def instrument_tensor(self, tensor, explanation):
self.instrument(tensor.name, explanation)
def create_report_proto(self, tt_config, tt_parameters, tensor_trace_order,
tensor_trace_points, collected_signature_types):
"""Creates and returns a proto that stores tensor tracer configuration.
Args:
tt_config: TensorTracerConfig object holding information about the run
environment (device, # cores, # hosts), and tensor tracer version
information.
tt_parameters: TTParameters objects storing the user provided parameters
for tensor tracer.
tensor_trace_order: TensorTraceOrder object storing a topological order of
the graph.
tensor_trace_points: Progromatically added trace_points/checkpoints.
collected_signature_types: The signature types collected, e,g, norm,
max, min, mean...
Returns:
TensorTracerReport proto.
"""
report = tensor_tracer_pb2.TensorTracerReport()
report.config.version = tt_config.version
report.config.device = tt_config.device_type
report.config.num_cores = tt_config.num_replicas
report.config.num_hosts = tt_config.num_hosts
report.config.num_cores_per_host = tt_config.num_replicas_per_host
report.config.submode = tt_parameters.submode
report.config.trace_mode = tt_parameters.trace_mode
for signature_name, _ in sorted(collected_signature_types.items(),
key=lambda x: x[1]):
report.config.signatures.append(signature_name)
for tensor in tensor_trace_order.graph_order.tensors:
tensor_def = tensor_tracer_pb2.TensorTracerReport.TracedTensorDef()
tensor_def.name = tensor.name
if tensor.name in tensor_trace_order.tensorname_to_cache_idx:
tensor_def.is_traced = True
tensor_def.cache_index = (
tensor_trace_order.tensorname_to_cache_idx[tensor.name])
else:
# To prevent small changes affecting the fingerprint calculation, avoid
# writing the untraced tensors to metadata. Fingerprints will be
# different only when the list of the traced tensors are different.
if tt_parameters.use_fingerprint_subdir:
continue
tensor_def.is_traced = False
if tensor.name in tensor_trace_points:
tensor_def.trace_point_name = tensor_trace_points[tensor.name]
if tensor.name in self.instrument_records:
tensor_def.explanation = self.instrument_records[tensor.name]
elif tensor.op.name in self.instrument_records:
tensor_def.explanation = self.instrument_records[tensor.op.name]
report.tensordef[tensor.name].CopyFrom(tensor_def)
report.fingerprint = proto_fingerprint(report)
logging.info('TensorTracerProto fingerprint is %s.',
report.fingerprint)
tf_graph = tensor_trace_order.graph_order.graph
report.graphdef.CopyFrom(tf_graph.as_graph_def())
return report
def report_proto_path(self, trace_dir, summary_tag_name):
"""Returns the path where report proto should be written.
Args:
trace_dir: String denoting the trace directory.
summary_tag_name: Name of the unique tag that relates to
the report.
Returns:
A string denoting the path to the report proto.
"""
filename = _TT_REPORT_PROTO + '.' + summary_tag_name.replace('/', '_')
return os.path.join(trace_dir, filename)
def write_report_proto(self, report_path, report_proto, tt_parameters):
"""Writes the given report proto under trace_dir."""
gfile.MakeDirs(tt_parameters.trace_dir)
with gfile.GFile(report_path, 'wb') as f:
f.write(report_proto.SerializeToString())
def create_report(self, tt_config, tt_parameters,
tensor_trace_order, tensor_trace_points):
"""Creates a report file and writes the trace information."""
with OpenReportFile(tt_parameters) as self._report_file:
self._write_config_section(tt_config, tt_parameters)
self._write_op_list_section(tensor_trace_order.graph_order)
self._write_tensor_list_section(tensor_trace_order.graph_order)
self._write_trace_points(tensor_trace_points)
self._write_cache_index_map_section(tensor_trace_order)
self._write_reason_section()
self._write_graph_section(tensor_trace_order.graph_order)
def _write_trace_points(self, tensor_trace_points):
"""Writes the list of checkpoints."""
self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN,
_SECTION_NAME_TENSOR_TRACER_CHECKPOINT))
for (tensor, checkpoint_name) in tensor_trace_points:
self._write_report('%s %s\n'%(tensor.name, checkpoint_name))
self._write_report('%s %s\n'%(_MARKER_SECTION_END,
_SECTION_NAME_TENSOR_TRACER_CHECKPOINT))
def _write_report(self, content):
"""Writes the given content to the report."""
line = '%s %s'%(_TRACER_LOG_PREFIX, content)
if self._report_file:
self._report_file.write(line)
else:
logging.info(line)
def _write_config_section(self, tt_config, tt_parameters):
"""Writes the config section of the report."""
self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, _SECTION_NAME_CONFIG))
self._write_report('%s %s\n'%(_FIELD_NAME_VERSION, tt_config.version))
self._write_report('%s %s\n'%(_FIELD_NAME_DEVICE, tt_config.device_type))
self._write_report('%s %s\n'%(_FIELD_NAME_TRACE_MODE,
tt_parameters.trace_mode))
self._write_report('%s %s\n'%(_FIELD_NAME_SUBMODE,
tt_parameters.submode))
self._write_report('%s %s\n'%(_FIELD_NAME_NUM_REPLICAS,
tt_config.num_replicas))
self._write_report('%s %s\n'%(_FIELD_NAME_NUM_REPLICAS_PER_HOST,
tt_config.num_replicas_per_host))
self._write_report('%s %s\n'%(_FIELD_NAME_NUM_HOSTS, tt_config.num_hosts))
self._write_report('%s %s\n'%(_MARKER_SECTION_END, _SECTION_NAME_CONFIG))
def _write_reason_section(self):
"""Writes the reason section of the report."""
self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, _SECTION_NAME_REASON))
for key in sorted(self.instrument_records):
self._write_report('"%s" %s\n'%(key, self.instrument_records[key]))
self._write_report('%s %s\n'%(_MARKER_SECTION_END, _SECTION_NAME_REASON))
def _write_op_list_section(self, graph_order):
"""Writes the Op-list section of the report."""
self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, _SECTION_NAME_OP_LIST))
self._write_report('%s %d\n'%(_FIELD_NAME_NUM_OPS,
len(graph_order.operations)))
for i in range(0, len(graph_order.operations)):
op = graph_order.operations[i]
line = '%d "%s" %s'%(i, op.name, op.type)
for out_tensor in op.outputs:
if out_tensor.name not in graph_order.tensor_to_idx:
raise ValueError(
'out_tensor is not in tensor_to_idx. out_tensor={}, '
'tensor_to_idx={}'
.format(out_tensor.name, graph_order.tensor_to_idx))
line += ' %d'%graph_order.tensor_to_idx[out_tensor.name]
line += '\n'
self._write_report(line)
self._write_report('%s %s\n'%(_MARKER_SECTION_END, _SECTION_NAME_OP_LIST))
def _write_tensor_list_section(self, graph_order):
"""Writes the tensor-list section of the report."""
self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN,
_SECTION_NAME_TENSOR_LIST))
self._write_report('%s %d\n'%(_FIELD_NAME_NUM_TENSORS,
len(graph_order.tensors)))
for i in range(0, len(graph_order.tensors)):
tensor = graph_order.tensors[i]
line = '%d "%s"'%(i, tensor.name)
consumers = tensor.consumers()
consumers.sort(key=lambda op: op.name)
for consumer_op in consumers:
if consumer_op.name not in graph_order.op_to_idx:
raise ValueError(
'consumer_op is not in op_to_idx. '
'got consumer_op={}, op_to_idx={}'
.format(consumer_op.name, graph_order.op_to_idx))
line += ' %d'%graph_order.op_to_idx[consumer_op.name]
line += '\n'
self._write_report(line)
self._write_report('%s %s\n'%(_MARKER_SECTION_END,
_SECTION_NAME_TENSOR_LIST))
def _write_cache_index_map_section(self, tensor_trace_order):
"""Writes the mapping from cache index to tensor index to the report."""
self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN,
_SECTION_NAME_CACHE_INDEX_MAP))
self._write_report('%s %d\n'%(
_FIELD_NAME_NUM_CACHE_INDICES,
len(tensor_trace_order.cache_idx_to_tensor_idx)))
for cache_idx in range(0, len(tensor_trace_order.cache_idx_to_tensor_idx)):
tensor_idx = tensor_trace_order.cache_idx_to_tensor_idx[cache_idx]
line = '%d %d\n'%(cache_idx, tensor_idx)
self._write_report(line)
self._write_report('%s %s\n'%(_MARKER_SECTION_END,
_SECTION_NAME_CACHE_INDEX_MAP))
def _write_graph_section(self, graph_order):
"""Writes the graph section of the report."""
self._write_report('%s %s\n'%(_MARKER_SECTION_BEGIN, _SECTION_NAME_GRAPH))
self._write_report('%s %s\n'%(_FIELD_NAME_TOPOLOGICAL_SORT_SUCCEED,
not graph_order.contains_cycle))
l = list(graph_order.topological_order_or_cycle)
for i in range(0, len(l)):
self._write_report('%d "%s"\n'%(i, l[i].name))
self._write_report('%s %s\n'%(_MARKER_SECTION_END, _SECTION_NAME_GRAPH))