api.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. #!/usr/bin/env python3
  2. # Copyright (c) Facebook, Inc. and its affiliates.
  3. # All rights reserved.
  4. #
  5. # This source code is licensed under the BSD-style license found in the
  6. # LICENSE file in the root directory of this source tree.
  7. import abc
  8. import time
  9. import warnings
  10. from collections import namedtuple
  11. from functools import wraps
  12. from typing import Dict, Optional
  13. __all__ = ['MetricsConfig', 'MetricHandler', 'ConsoleMetricHandler', 'NullMetricHandler', 'MetricStream',
  14. 'configure', 'getStream', 'prof', 'profile', 'put_metric', 'publish_metric', 'get_elapsed_time_ms',
  15. 'MetricData']
  16. MetricData = namedtuple("MetricData", ["timestamp", "group_name", "name", "value"])
  17. class MetricsConfig:
  18. __slots__ = ["params"]
  19. def __init__(self, params: Optional[Dict[str, str]] = None):
  20. self.params = params
  21. if self.params is None:
  22. self.params = {}
  23. class MetricHandler(abc.ABC):
  24. @abc.abstractmethod
  25. def emit(self, metric_data: MetricData):
  26. pass
  27. class ConsoleMetricHandler(MetricHandler):
  28. def emit(self, metric_data: MetricData):
  29. print(
  30. "[{}][{}]: {}={}".format(
  31. metric_data.timestamp,
  32. metric_data.group_name,
  33. metric_data.name,
  34. metric_data.value,
  35. )
  36. )
  37. class NullMetricHandler(MetricHandler):
  38. def emit(self, metric_data: MetricData):
  39. pass
  40. class MetricStream:
  41. def __init__(self, group_name: str, handler: MetricHandler):
  42. self.group_name = group_name
  43. self.handler = handler
  44. def add_value(self, metric_name: str, metric_value: int):
  45. self.handler.emit(
  46. MetricData(time.time(), self.group_name, metric_name, metric_value)
  47. )
  48. _metrics_map = {}
  49. _default_metrics_handler: MetricHandler = NullMetricHandler()
  50. # pyre-fixme[9]: group has type `str`; used as `None`.
  51. def configure(handler: MetricHandler, group: str = None):
  52. if group is None:
  53. global _default_metrics_handler
  54. # pyre-fixme[9]: _default_metrics_handler has type `NullMetricHandler`; used
  55. # as `MetricHandler`.
  56. _default_metrics_handler = handler
  57. else:
  58. _metrics_map[group] = handler
  59. def getStream(group: str):
  60. if group in _metrics_map:
  61. handler = _metrics_map[group]
  62. else:
  63. handler = _default_metrics_handler
  64. return MetricStream(group, handler)
  65. def _get_metric_name(fn):
  66. qualname = fn.__qualname__
  67. split = qualname.split(".")
  68. if len(split) == 1:
  69. module = fn.__module__
  70. if module:
  71. return module.split(".")[-1] + "." + split[0]
  72. else:
  73. return split[0]
  74. else:
  75. return qualname
  76. def prof(fn=None, group: str = "torchelastic"):
  77. r"""
  78. @profile decorator publishes duration.ms, count, success, failure
  79. metrics for the function that it decorates. The metric name defaults
  80. to the qualified name (``class_name.def_name``) of the function.
  81. If the function does not belong to a class, it uses the leaf module name
  82. instead.
  83. Usage
  84. ::
  85. @metrics.prof
  86. def x():
  87. pass
  88. @metrics.prof(group="agent")
  89. def y():
  90. pass
  91. """
  92. def wrap(f):
  93. @wraps(f)
  94. def wrapper(*args, **kwargs):
  95. key = _get_metric_name(f)
  96. try:
  97. start = time.time()
  98. result = f(*args, **kwargs)
  99. put_metric(f"{key}.success", 1, group)
  100. except Exception:
  101. put_metric(f"{key}.failure", 1, group)
  102. raise
  103. finally:
  104. put_metric(f"{key}.duration.ms", get_elapsed_time_ms(start), group)
  105. return result
  106. return wrapper
  107. if fn:
  108. return wrap(fn)
  109. else:
  110. return wrap
  111. def profile(group=None):
  112. """
  113. @profile decorator adds latency and success/failure metrics to any given function.
  114. Usage
  115. ::
  116. @metrics.profile("my_metric_group")
  117. def some_function(<arguments>):
  118. """
  119. warnings.warn("Deprecated, use @prof instead", DeprecationWarning)
  120. def wrap(func):
  121. @wraps(func)
  122. def wrapper(*args, **kwargs):
  123. try:
  124. start_time = time.time()
  125. result = func(*args, **kwargs)
  126. publish_metric(group, "{}.success".format(func.__name__), 1)
  127. except Exception:
  128. publish_metric(group, "{}.failure".format(func.__name__), 1)
  129. raise
  130. finally:
  131. publish_metric(
  132. group,
  133. "{}.duration.ms".format(func.__name__),
  134. get_elapsed_time_ms(start_time),
  135. )
  136. return result
  137. return wrapper
  138. return wrap
  139. def put_metric(metric_name: str, metric_value: int, metric_group: str = "torchelastic"):
  140. """
  141. Publishes a metric data point.
  142. Usage
  143. ::
  144. put_metric("metric_name", 1)
  145. put_metric("metric_name", 1, "metric_group_name")
  146. """
  147. getStream(metric_group).add_value(metric_name, metric_value)
  148. def publish_metric(metric_group: str, metric_name: str, metric_value: int):
  149. warnings.warn(
  150. "Deprecated, use put_metric(metric_group)(metric_name, metric_value) instead"
  151. )
  152. metric_stream = getStream(metric_group)
  153. metric_stream.add_value(metric_name, metric_value)
  154. def get_elapsed_time_ms(start_time_in_seconds: float):
  155. """
  156. Returns the elapsed time in millis from the given start time.
  157. """
  158. end_time = time.time()
  159. return int((end_time - start_time_in_seconds) * 1000)