blob: b4d5d4e610f1b375f0d8a0a46d3ff6991406afea [file] [log] [blame]
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""TensorFlow monitoring APIs."""
import collections
import functools
import time
from tensorflow.core.framework import summary_pb2
from tensorflow.python import pywrap_tfe
from tensorflow.python.client import pywrap_tf_session
from tensorflow.python.framework import c_api_util
from tensorflow.python.util import compat
from tensorflow.python.util.tf_export import tf_export
_MetricMethod = collections.namedtuple('MetricMethod', 'create delete get_cell')
_counter_methods = [
_MetricMethod(
create=pywrap_tfe.TFE_MonitoringNewCounter0,
delete=pywrap_tfe.TFE_MonitoringDeleteCounter0,
get_cell=pywrap_tfe.TFE_MonitoringGetCellCounter0),
_MetricMethod(
create=pywrap_tfe.TFE_MonitoringNewCounter1,
delete=pywrap_tfe.TFE_MonitoringDeleteCounter1,
get_cell=pywrap_tfe.TFE_MonitoringGetCellCounter1),
_MetricMethod(
create=pywrap_tfe.TFE_MonitoringNewCounter2,
delete=pywrap_tfe.TFE_MonitoringDeleteCounter2,
get_cell=pywrap_tfe.TFE_MonitoringGetCellCounter2),
]
_int_gauge_methods = [
_MetricMethod(
create=pywrap_tfe.TFE_MonitoringNewIntGauge0,
delete=pywrap_tfe.TFE_MonitoringDeleteIntGauge0,
get_cell=pywrap_tfe.TFE_MonitoringGetCellIntGauge0),
_MetricMethod(
create=pywrap_tfe.TFE_MonitoringNewIntGauge1,
delete=pywrap_tfe.TFE_MonitoringDeleteIntGauge1,
get_cell=pywrap_tfe.TFE_MonitoringGetCellIntGauge1),
_MetricMethod(
create=pywrap_tfe.TFE_MonitoringNewIntGauge2,
delete=pywrap_tfe.TFE_MonitoringDeleteIntGauge2,
get_cell=pywrap_tfe.TFE_MonitoringGetCellIntGauge2),
]
_string_gauge_methods = [
_MetricMethod(
create=pywrap_tfe.TFE_MonitoringNewStringGauge0,
delete=pywrap_tfe.TFE_MonitoringDeleteStringGauge0,
get_cell=pywrap_tfe.TFE_MonitoringGetCellStringGauge0),
_MetricMethod(
create=pywrap_tfe.TFE_MonitoringNewStringGauge1,
delete=pywrap_tfe.TFE_MonitoringDeleteStringGauge1,
get_cell=pywrap_tfe.TFE_MonitoringGetCellStringGauge1),
_MetricMethod(
create=pywrap_tfe.TFE_MonitoringNewStringGauge2,
delete=pywrap_tfe.TFE_MonitoringDeleteStringGauge2,
get_cell=pywrap_tfe.TFE_MonitoringGetCellStringGauge2),
_MetricMethod(
create=pywrap_tfe.TFE_MonitoringNewStringGauge3,
delete=pywrap_tfe.TFE_MonitoringDeleteStringGauge3,
get_cell=pywrap_tfe.TFE_MonitoringGetCellStringGauge3),
_MetricMethod(
create=pywrap_tfe.TFE_MonitoringNewStringGauge4,
delete=pywrap_tfe.TFE_MonitoringDeleteStringGauge4,
get_cell=pywrap_tfe.TFE_MonitoringGetCellStringGauge4),
]
_bool_gauge_methods = [
_MetricMethod(
create=pywrap_tfe.TFE_MonitoringNewBoolGauge0,
delete=pywrap_tfe.TFE_MonitoringDeleteBoolGauge0,
get_cell=pywrap_tfe.TFE_MonitoringGetCellBoolGauge0),
_MetricMethod(
create=pywrap_tfe.TFE_MonitoringNewBoolGauge1,
delete=pywrap_tfe.TFE_MonitoringDeleteBoolGauge1,
get_cell=pywrap_tfe.TFE_MonitoringGetCellBoolGauge1),
_MetricMethod(
create=pywrap_tfe.TFE_MonitoringNewBoolGauge2,
delete=pywrap_tfe.TFE_MonitoringDeleteBoolGauge2,
get_cell=pywrap_tfe.TFE_MonitoringGetCellBoolGauge2),
]
_sampler_methods = [
_MetricMethod(
create=pywrap_tfe.TFE_MonitoringNewSampler0,
delete=pywrap_tfe.TFE_MonitoringDeleteSampler0,
get_cell=pywrap_tfe.TFE_MonitoringGetCellSampler0),
_MetricMethod(
create=pywrap_tfe.TFE_MonitoringNewSampler1,
delete=pywrap_tfe.TFE_MonitoringDeleteSampler1,
get_cell=pywrap_tfe.TFE_MonitoringGetCellSampler1),
_MetricMethod(
create=pywrap_tfe.TFE_MonitoringNewSampler2,
delete=pywrap_tfe.TFE_MonitoringDeleteSampler2,
get_cell=pywrap_tfe.TFE_MonitoringGetCellSampler2),
]
class Metric(object):
"""The base class of metric."""
__slots__ = ["_metric", "_metric_name", "_metric_methods", "_label_length"]
def __init__(self, metric_name, metric_methods, label_length, *args):
"""Creates a new metric.
Args:
metric_name: name of the metric class.
metric_methods: list of swig metric methods.
label_length: length of label args.
*args: the arguments to call create method.
"""
self._metric_name = metric_name
self._metric_methods = metric_methods
self._label_length = label_length
if label_length >= len(self._metric_methods):
raise ValueError('Cannot create {} metric with label >= {}'.format(
self._metric_name, len(self._metric_methods)))
self._metric = self._metric_methods[self._label_length].create(*args)
def __del__(self):
try:
deleter = self._metric_methods[self._label_length].delete
metric = self._metric
except AttributeError:
return
if deleter is not None:
deleter(metric)
def get_cell(self, *labels):
"""Retrieves the cell."""
if len(labels) != self._label_length:
raise ValueError('The {} expects taking {} labels'.format(
self._metric_name, self._label_length))
return self._metric_methods[self._label_length].get_cell(
self._metric, *labels)
class CounterCell(object):
"""CounterCell stores each value of a Counter."""
__slots__ = ["_cell"]
def __init__(self, cell):
"""Creates a new CounterCell.
Args:
cell: A c pointer of TFE_MonitoringCounterCell.
"""
self._cell = cell
def increase_by(self, value):
"""Atomically increments the value.
Args:
value: non-negative value.
"""
pywrap_tfe.TFE_MonitoringCounterCellIncrementBy(self._cell, value)
def value(self):
"""Retrieves the current value."""
return pywrap_tfe.TFE_MonitoringCounterCellValue(self._cell)
class Counter(Metric):
"""A stateful class for updating a cumulative integer metric.
This class encapsulates a set of values (or a single value for a label-less
metric). Each value is identified by a tuple of labels. The class allows the
user to increment each value.
"""
__slots__ = []
def __init__(self, name, description, *labels):
"""Creates a new Counter.
Args:
name: name of the new metric.
description: description of the new metric.
*labels: The label list of the new metric.
"""
super(Counter, self).__init__('Counter', _counter_methods, len(labels),
name, description, *labels)
def get_cell(self, *labels):
"""Retrieves the cell."""
return CounterCell(super(Counter, self).get_cell(*labels))
class IntGaugeCell(object):
"""A single integer value stored in an `IntGauge`."""
__slots__ = ["_cell"]
def __init__(self, cell):
"""Creates a new IntGaugeCell.
Args:
cell: A c pointer of TFE_MonitoringIntGaugeCell.
"""
self._cell = cell
def set(self, value):
"""Atomically set the value.
Args:
value: integer value.
"""
pywrap_tfe.TFE_MonitoringIntGaugeCellSet(self._cell, value)
def value(self):
"""Retrieves the current value."""
return pywrap_tfe.TFE_MonitoringIntGaugeCellValue(self._cell)
class IntGauge(Metric):
"""A stateful class for updating a gauge-like integer metric.
This class encapsulates a set of integer values (or a single value for a
label-less metric). Each value is identified by a tuple of labels. The class
allows the user to set each value.
"""
__slots__ = []
def __init__(self, name, description, *labels):
"""Creates a new IntGauge.
Args:
name: name of the new metric.
description: description of the new metric.
*labels: The label list of the new metric.
"""
super(IntGauge, self).__init__('IntGauge', _int_gauge_methods, len(labels),
name, description, *labels)
def get_cell(self, *labels):
"""Retrieves the cell."""
return IntGaugeCell(super(IntGauge, self).get_cell(*labels))
class StringGaugeCell(object):
"""A single string value stored in an `StringGauge`."""
__slots__ = ["_cell"]
def __init__(self, cell):
"""Creates a new StringGaugeCell.
Args:
cell: A c pointer of TFE_MonitoringStringGaugeCell.
"""
self._cell = cell
def set(self, value):
"""Atomically set the value.
Args:
value: string value.
"""
pywrap_tfe.TFE_MonitoringStringGaugeCellSet(self._cell, value)
def value(self):
"""Retrieves the current value."""
with c_api_util.tf_buffer() as buffer_:
pywrap_tfe.TFE_MonitoringStringGaugeCellValue(self._cell, buffer_)
value = pywrap_tf_session.TF_GetBuffer(buffer_).decode('utf-8')
return value
class StringGauge(Metric):
"""A stateful class for updating a gauge-like string metric.
This class encapsulates a set of string values (or a single value for a
label-less metric). Each value is identified by a tuple of labels. The class
allows the user to set each value.
"""
__slots__ = []
def __init__(self, name, description, *labels):
"""Creates a new StringGauge.
Args:
name: name of the new metric.
description: description of the new metric.
*labels: The label list of the new metric.
"""
super(StringGauge, self).__init__('StringGauge', _string_gauge_methods,
len(labels), name, description, *labels)
def get_cell(self, *labels):
"""Retrieves the cell."""
return StringGaugeCell(super(StringGauge, self).get_cell(*labels))
class BoolGaugeCell(object):
"""A single boolean value stored in an `BoolGauge`."""
__slots__ = ["_cell"]
def __init__(self, cell):
"""Creates a new BoolGaugeCell.
Args:
cell: A c pointer of TFE_MonitoringBoolGaugeCell.
"""
self._cell = cell
def set(self, value):
"""Atomically set the value.
Args:
value: bool value.
"""
pywrap_tfe.TFE_MonitoringBoolGaugeCellSet(self._cell, value)
def value(self):
"""Retrieves the current value."""
return pywrap_tfe.TFE_MonitoringBoolGaugeCellValue(self._cell)
@tf_export("__internal__.monitoring.BoolGauge", v1=[])
class BoolGauge(Metric):
"""A stateful class for updating a gauge-like bool metric.
This class encapsulates a set of boolean values (or a single value for a
label-less metric). Each value is identified by a tuple of labels. The class
allows the user to set each value.
"""
__slots__ = []
def __init__(self, name, description, *labels):
"""Creates a new BoolGauge.
Args:
name: name of the new metric.
description: description of the new metric.
*labels: The label list of the new metric.
"""
super(BoolGauge, self).__init__('BoolGauge', _bool_gauge_methods,
len(labels), name, description, *labels)
def get_cell(self, *labels):
"""Retrieves the cell."""
return BoolGaugeCell(super(BoolGauge, self).get_cell(*labels))
class SamplerCell(object):
"""SamplerCell stores each value of a Sampler."""
__slots__ = ["_cell"]
def __init__(self, cell):
"""Creates a new SamplerCell.
Args:
cell: A c pointer of TFE_MonitoringSamplerCell.
"""
self._cell = cell
def add(self, value):
"""Atomically add a sample.
Args:
value: float value.
"""
pywrap_tfe.TFE_MonitoringSamplerCellAdd(self._cell, value)
def value(self):
"""Retrieves the current distribution of samples.
Returns:
A HistogramProto describing the distribution of samples.
"""
with c_api_util.tf_buffer() as buffer_:
pywrap_tfe.TFE_MonitoringSamplerCellValue(self._cell, buffer_)
proto_data = pywrap_tf_session.TF_GetBuffer(buffer_)
histogram_proto = summary_pb2.HistogramProto()
histogram_proto.ParseFromString(compat.as_bytes(proto_data))
return histogram_proto
class Buckets(object):
"""Bucketing strategies for the samplers."""
__slots__ = ["buckets"]
def __init__(self, buckets):
"""Creates a new Buckets.
Args:
buckets: A c pointer of TFE_MonitoringBuckets.
"""
self.buckets = buckets
def __del__(self):
pywrap_tfe.TFE_MonitoringDeleteBuckets(self.buckets)
class ExponentialBuckets(Buckets):
"""Exponential bucketing strategy.
Sets up buckets of the form:
[-DBL_MAX, ..., scale * growth^i,
scale * growth_factor^(i + 1), ..., DBL_MAX].
"""
__slots__ = []
def __init__(self, scale, growth_factor, bucket_count):
"""Creates a new exponential Buckets.
Args:
scale: float
growth_factor: float
bucket_count: integer
"""
super(ExponentialBuckets, self).__init__(
pywrap_tfe.TFE_MonitoringNewExponentialBuckets(scale, growth_factor,
bucket_count))
class Sampler(Metric):
"""A stateful class for updating a cumulative histogram metric.
This class encapsulates a set of histograms (or a single histogram for a
label-less metric) configured with a list of increasing bucket boundaries.
Each histogram is identified by a tuple of labels. The class allows the
user to add a sample to each histogram value.
"""
__slots__ = []
def __init__(self, name, buckets, description, *labels):
"""Creates a new Sampler.
Args:
name: name of the new metric.
buckets: bucketing strategy of the new metric.
description: description of the new metric.
*labels: The label list of the new metric.
"""
super(Sampler, self).__init__('Sampler', _sampler_methods, len(labels),
name, buckets.buckets, description, *labels)
def get_cell(self, *labels):
"""Retrieves the cell."""
return SamplerCell(super(Sampler, self).get_cell(*labels))
# Keeping track of current MonitoredTimer sections to prevent repetitive
# counting.
MonitoredTimerSections = []
class MonitoredTimer(object):
"""A context manager to measure the walltime and increment a Counter cell."""
__slots__ = [
"cell",
"t",
"monitored_section_name",
"_counting",
"_avoid_repetitive_counting",
]
def __init__(
self, cell, monitored_section_name=None, avoid_repetitive_counting=False
):
"""Creates a new MonitoredTimer.
Args:
cell: the cell associated with the time metric that will be inremented.
monitored_section_name: name of action being monitored here.
avoid_repetitive_counting: when set to True, if already in a monitored
timer section with the same monitored_section_name, skip counting.
"""
self.cell = cell
self.monitored_section_name = monitored_section_name
self._avoid_repetitive_counting = avoid_repetitive_counting
self._counting = True
def __enter__(self):
if (
self._avoid_repetitive_counting
and self.monitored_section_name
and self.monitored_section_name in MonitoredTimerSections
):
self._counting = False
return self
self.t = time.time()
if self.monitored_section_name:
MonitoredTimerSections.append(self.monitored_section_name)
return self
def __exit__(self, exception_type, exception_value, traceback):
del exception_type, exception_value, traceback
if self._counting:
micro_seconds = (time.time() - self.t) * 1000000
self.cell.increase_by(int(micro_seconds))
if self.monitored_section_name:
MonitoredTimerSections.remove(self.monitored_section_name)
def monitored_timer(cell):
"""A function decorator for adding MonitoredTimer support.
Args:
cell: the cell associated with the time metric that will be inremented.
Returns:
A decorator that measure the function runtime and increment the specified
counter cell.
"""
def actual_decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
with MonitoredTimer(cell):
return func(*args, **kwargs)
return wrapper
return actual_decorator