123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631 |
- import gzip
- import json
- import os
- import tempfile
- from enum import Enum
- from functools import partial
- from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple
- from warnings import warn
- import torch
- import torch.autograd.profiler as prof
- from torch._C._profiler import (
- _add_execution_graph_observer,
- _disable_execution_graph_observer,
- _enable_execution_graph_observer,
- _ExperimentalConfig,
- _remove_execution_graph_observer,
- )
- from torch.autograd import kineto_available, ProfilerActivity
- from torch.profiler import _memory_profiler
- __all__ = [
- "supported_activities",
- "ProfilerAction",
- "schedule",
- "tensorboard_trace_handler",
- "profile",
- "ExecutionGraphObserver",
- ]
- PROFILER_STEP_NAME = "ProfilerStep"
- def supported_activities():
- """
- Returns a set of supported profiler tracing activities.
- Note: profiler uses CUPTI library to trace on-device CUDA kernels.
- In case when CUDA is enabled but CUPTI is not available, passing
- ``ProfilerActivity.CUDA`` to profiler results in using the legacy CUDA
- profiling code (same as in the legacy ``torch.autograd.profiler``).
- This, in turn, results in including CUDA time in the profiler table output,
- but not in the JSON trace.
- """
- return torch.autograd._supported_activities()
- class _KinetoProfile:
- """Low-level profiler wrap the autograd profile
- Args:
- activities (iterable): list of activity groups (CPU, CUDA) to use in profiling, supported values:
- ``torch.profiler.ProfilerActivity.CPU``, ``torch.profiler.ProfilerActivity.CUDA``.
- Default value: ProfilerActivity.CPU and (when available) ProfilerActivity.CUDA.
- record_shapes (bool): save information about operator's input shapes.
- profile_memory (bool): track tensor memory allocation/deallocation.
- with_stack (bool): record source information (file and line number) for the ops.
- with_flops (bool): use formula to estimate the FLOPS of specific operators
- (matrix multiplication and 2D convolution).
- with_modules (bool): record module hierarchy (including function names)
- corresponding to the callstack of the op. e.g. If module A's forward call's
- module B's forward which contains an aten::add op,
- then aten::add's module hierarchy is A.B
- Note that this support exist, at the moment, only for TorchScript models
- and not eager mode models.
- experimental_config (_ExperimentalConfig) : A set of experimental options
- used by profiler libraries like Kineto. Note, backward compatibility is not guaranteed.
- .. note::
- This API is experimental and subject to change in the future.
- Enabling shape and stack tracing results in additional overhead.
- When record_shapes=True is specified, profiler will temporarily hold references to the tensors;
- that may further prevent certain optimizations that depend on the reference count and introduce
- extra tensor copies.
- """
- def __init__(
- self,
- *,
- activities: Optional[Iterable[ProfilerActivity]] = None,
- record_shapes: bool = False,
- profile_memory: bool = False,
- with_stack: bool = False,
- with_flops: bool = False,
- with_modules: bool = False,
- experimental_config: Optional[_ExperimentalConfig] = None):
- self.activities = set(activities) if activities else supported_activities()
- self.record_shapes = record_shapes
- self.with_flops = with_flops
- self.profile_memory = profile_memory
- self.with_stack = with_stack
- self.with_modules = with_modules
- self.experimental_config = experimental_config
- self.profiler: Optional[prof.profile] = None
- def start(self):
- self.prepare_trace()
- self.start_trace()
- def stop(self):
- self.stop_trace()
- def prepare_trace(self):
- self.profiler = prof.profile(
- use_cuda=(ProfilerActivity.CUDA in self.activities),
- use_cpu=(ProfilerActivity.CPU in self.activities),
- record_shapes=self.record_shapes,
- with_flops=self.with_flops,
- profile_memory=self.profile_memory,
- with_stack=self.with_stack,
- with_modules=self.with_modules,
- use_kineto=True,
- experimental_config=self.experimental_config,
- )
- self.profiler._prepare_trace()
- def start_trace(self):
- assert self.profiler is not None
- self.profiler._start_trace()
- if self.profile_memory:
- self.add_metadata_json("profile_memory", "1")
- if self.with_stack:
- self.add_metadata_json("with_stack", "1")
- if self.record_shapes:
- self.add_metadata_json("record_shapes", "1")
- if self.with_modules:
- self.add_metadata_json("with_modules", "1")
- if self.with_flops:
- self.add_metadata_json("with_flops", "1")
- if kineto_available():
- dist_info = self._get_distributed_info()
- if dist_info:
- self.add_metadata_json("distributedInfo", json.dumps(dist_info))
- def stop_trace(self):
- assert self.profiler is not None
- self.profiler.__exit__(None, None, None)
- def export_chrome_trace(self, path: str):
- """
- Exports the collected trace in Chrome JSON format.
- """
- assert self.profiler
- if path.endswith('.gz'):
- fp = tempfile.NamedTemporaryFile('w+t', suffix='.json', delete=False)
- fp.close()
- retvalue = self.profiler.export_chrome_trace(fp.name)
- with open(fp.name) as fin:
- with gzip.open(path, 'wt') as fout:
- fout.writelines(fin)
- os.remove(fp.name)
- return retvalue
- else:
- return self.profiler.export_chrome_trace(path)
- def export_stacks(self, path: str, metric: str = "self_cpu_time_total"):
- """Save stack traces in a file in a format suitable for visualization.
- Args:
- path (str): save stacks file to this location;
- metric (str): metric to use: "self_cpu_time_total" or "self_cuda_time_total"
- .. note::
- Example of using FlameGraph tool:
- - git clone https://github.com/brendangregg/FlameGraph
- - cd FlameGraph
- - ./flamegraph.pl --title "CPU time" --countname "us." profiler.stacks > perf_viz.svg
- """
- assert self.profiler
- return self.profiler.export_stacks(path, metric)
- def key_averages(self, group_by_input_shape: bool = False, group_by_stack_n: int = 0):
- """Averages events, grouping them by operator name and (optionally) input shapes and
- stack.
- .. note::
- To use shape/stack functionality make sure to set record_shapes/with_stack
- when creating profiler context manager.
- """
- assert self.profiler
- return self.profiler.key_averages(group_by_input_shape, group_by_stack_n)
- def events(self):
- """
- Returns the list of unaggregated profiler events,
- to be used in the trace callback or after the profiling is finished
- """
- assert self.profiler
- return self.profiler.function_events
- def add_metadata(self, key: str, value: str):
- """
- Adds a user defined metadata with a string key and a string value
- into the trace file
- """
- wrapped_value = "\"" + value.replace('"', '\\"') + "\""
- torch.autograd._add_metadata_json(key, wrapped_value)
- def add_metadata_json(self, key: str, value: str):
- """
- Adds a user defined metadata with a string key and a valid json value
- into the trace file
- """
- torch.autograd._add_metadata_json(key, value)
- def _get_distributed_info(self):
- import torch.distributed as dist
- if not dist.is_available() or not dist.is_initialized():
- return None
- return {
- "backend": dist.get_backend(),
- "rank": dist.get_rank(),
- "world_size": dist.get_world_size()
- }
- def _memory_profile(self) -> _memory_profiler.MemoryProfile:
- required = ("record_shapes", "profile_memory", "with_stack")
- missing = [f"{i}=True" for i in required if not getattr(self, i)]
- if missing:
- raise ValueError(f"{', '.join(missing)} required for memory profiling.")
- assert self.profiler is not None and self.profiler.kineto_results is not None
- return _memory_profiler.MemoryProfile(self.profiler.kineto_results)
- class ProfilerAction(Enum):
- """
- Profiler actions that can be taken at the specified intervals
- """
- NONE = 0
- WARMUP = 1
- RECORD = 2
- RECORD_AND_SAVE = 3
- def schedule(*, wait: int, warmup: int, active: int, repeat: int = 0, skip_first: int = 0) -> Callable:
- """
- Returns a callable that can be used as profiler ``schedule`` argument. The profiler will skip
- the first ``skip_first`` steps, then wait for ``wait`` steps, then do the warmup for the next ``warmup`` steps,
- then do the active recording for the next ``active`` steps and then repeat the cycle starting with ``wait`` steps.
- The optional number of cycles is specified with the ``repeat`` parameter, the zero value means that
- the cycles will continue until the profiling is finished.
- """
- def schedule_fn(step: int) -> ProfilerAction:
- assert step >= 0
- if step < skip_first:
- return ProfilerAction.NONE
- else:
- step -= skip_first
- num_steps = wait + warmup + active
- if repeat > 0 and step / num_steps >= repeat:
- return ProfilerAction.NONE
- mod_step = step % num_steps
- if mod_step < wait:
- return ProfilerAction.NONE
- elif mod_step < wait + warmup:
- return ProfilerAction.WARMUP
- else:
- return ProfilerAction.RECORD if mod_step < num_steps - 1 \
- else ProfilerAction.RECORD_AND_SAVE
- assert wait >= 0 and warmup >= 0 and active > 0 and \
- repeat >= 0 and skip_first >= 0, "Invalid profiler schedule arguments"
- if warmup == 0:
- warn("Profiler won't be using warmup, this can skew profiler results")
- return schedule_fn
- def _default_schedule_fn(_: int) -> ProfilerAction:
- """
- Default profiler behavior - immediately starts recording the events,
- keeps doing it on every profiler step.
- """
- return ProfilerAction.RECORD
- def tensorboard_trace_handler(dir_name: str, worker_name: Optional[str] = None, use_gzip: bool = False):
- """
- Outputs tracing files to directory of ``dir_name``, then that directory can be
- directly delivered to tensorboard as logdir.
- ``worker_name`` should be unique for each worker in distributed scenario,
- it will be set to '[hostname]_[pid]' by default.
- """
- import os
- import socket
- import time
- def handler_fn(prof) -> None:
- nonlocal worker_name
- if not os.path.isdir(dir_name):
- try:
- os.makedirs(dir_name, exist_ok=True)
- except Exception as e:
- raise RuntimeError("Can't create directory: " + dir_name) from e
- if not worker_name:
- worker_name = "{}_{}".format(socket.gethostname(), str(os.getpid()))
- file_name = "{}.{}.pt.trace.json".format(worker_name, int(time.time() * 1000))
- if use_gzip:
- file_name = file_name + '.gz'
- prof.export_chrome_trace(os.path.join(dir_name, file_name))
- return handler_fn
- class profile(_KinetoProfile):
- """Profiler context manager.
- Args:
- activities (iterable): list of activity groups (CPU, CUDA) to use in profiling, supported values:
- ``torch.profiler.ProfilerActivity.CPU``, ``torch.profiler.ProfilerActivity.CUDA``.
- Default value: ProfilerActivity.CPU and (when available) ProfilerActivity.CUDA.
- schedule (Callable): callable that takes step (int) as a single parameter and returns
- ``ProfilerAction`` value that specifies the profiler action to perform at each step.
- on_trace_ready (Callable): callable that is called at each step when ``schedule``
- returns ``ProfilerAction.RECORD_AND_SAVE`` during the profiling.
- record_shapes (bool): save information about operator's input shapes.
- profile_memory (bool): track tensor memory allocation/deallocation.
- with_stack (bool): record source information (file and line number) for the ops.
- with_flops (bool): use formula to estimate the FLOPs (floating point operations) of specific operators
- (matrix multiplication and 2D convolution).
- with_modules (bool): record module hierarchy (including function names)
- corresponding to the callstack of the op. e.g. If module A's forward call's
- module B's forward which contains an aten::add op,
- then aten::add's module hierarchy is A.B
- Note that this support exist, at the moment, only for TorchScript models
- and not eager mode models.
- experimental_config (_ExperimentalConfig) : A set of experimental options
- used for Kineto library features. Note, backward compatibility is not guaranteed.
- use_cuda (bool):
- .. deprecated:: 1.8.1
- use ``activities`` instead.
- .. note::
- Use :func:`~torch.profiler.schedule` to generate the callable schedule.
- Non-default schedules are useful when profiling long training jobs
- and allow the user to obtain multiple traces at the different iterations
- of the training process.
- The default schedule simply records all the events continuously for the
- duration of the context manager.
- .. note::
- Use :func:`~torch.profiler.tensorboard_trace_handler` to generate result files for TensorBoard:
- ``on_trace_ready=torch.profiler.tensorboard_trace_handler(dir_name)``
- After profiling, result files can be found in the specified directory. Use the command:
- ``tensorboard --logdir dir_name``
- to see the results in TensorBoard.
- For more information, see
- `PyTorch Profiler TensorBoard Plugin <https://github.com/pytorch/kineto/tree/master/tb_plugin>`__
- .. note::
- Enabling shape and stack tracing results in additional overhead.
- When record_shapes=True is specified, profiler will temporarily hold references to the tensors;
- that may further prevent certain optimizations that depend on the reference count and introduce
- extra tensor copies.
- Examples:
- .. code-block:: python
- with torch.profiler.profile(
- activities=[
- torch.profiler.ProfilerActivity.CPU,
- torch.profiler.ProfilerActivity.CUDA,
- ]
- ) as p:
- code_to_profile()
- print(p.key_averages().table(
- sort_by="self_cuda_time_total", row_limit=-1))
- Using the profiler's ``schedule``, ``on_trace_ready`` and ``step`` functions:
- .. code-block:: python
- # Non-default profiler schedule allows user to turn profiler on and off
- # on different iterations of the training loop;
- # trace_handler is called every time a new trace becomes available
- def trace_handler(prof):
- print(prof.key_averages().table(
- sort_by="self_cuda_time_total", row_limit=-1))
- # prof.export_chrome_trace("/tmp/test_trace_" + str(prof.step_num) + ".json")
- with torch.profiler.profile(
- activities=[
- torch.profiler.ProfilerActivity.CPU,
- torch.profiler.ProfilerActivity.CUDA,
- ],
- # In this example with wait=1, warmup=1, active=2, repeat=1,
- # profiler will skip the first step/iteration,
- # start warming up on the second, record
- # the third and the forth iterations,
- # after which the trace will become available
- # and on_trace_ready (when set) is called;
- # the cycle repeats starting with the next step
- schedule=torch.profiler.schedule(
- wait=1,
- warmup=1,
- active=2,
- repeat=1),
- on_trace_ready=trace_handler
- # on_trace_ready=torch.profiler.tensorboard_trace_handler('./log')
- # used when outputting for tensorboard
- ) as p:
- for iter in range(N):
- code_iteration_to_profile(iter)
- # send a signal to the profiler that the next iteration has started
- p.step()
- """
- def __init__(
- self,
- *,
- activities: Optional[Iterable[ProfilerActivity]] = None,
- schedule: Optional[Callable[[int], ProfilerAction]] = None,
- on_trace_ready: Optional[Callable[..., Any]] = None,
- record_shapes: bool = False,
- profile_memory: bool = False,
- with_stack: bool = False,
- with_flops: bool = False,
- with_modules: bool = False,
- experimental_config: Optional[_ExperimentalConfig] = None,
- # deprecated:
- use_cuda: Optional[bool] = None):
- activities_set = set(activities) if activities else supported_activities()
- if use_cuda is not None:
- warn("use_cuda is deprecated, use activities argument instead")
- if use_cuda:
- activities_set.add(ProfilerActivity.CUDA)
- elif ProfilerActivity.CUDA in activities_set:
- activities_set.remove(ProfilerActivity.CUDA)
- assert len(activities_set) > 0, "No valid profiler activities found"
- super().__init__(
- activities=activities,
- record_shapes=record_shapes,
- profile_memory=profile_memory,
- with_stack=with_stack,
- with_flops=with_flops,
- with_modules=with_modules,
- experimental_config=experimental_config,
- )
- if schedule:
- self.schedule = schedule
- # add step markers into the trace and table view
- self.record_steps = True
- else:
- self.schedule = _default_schedule_fn
- self.record_steps = False
- self.on_trace_ready = on_trace_ready
- self.step_num = 0
- self.current_action = self.schedule(self.step_num)
- self.step_rec_fn: Optional[prof.record_function] = None
- self.action_map: Dict[Tuple[ProfilerAction, Optional[ProfilerAction]], List[Any]] = {
- # key is (prev_action, current_action), value is action list corresponding to the state pair.
- (ProfilerAction.NONE, ProfilerAction.NONE): [],
- (ProfilerAction.NONE, ProfilerAction.WARMUP): [self.prepare_trace],
- (ProfilerAction.NONE, ProfilerAction.RECORD): [self.prepare_trace, self.start_trace],
- (ProfilerAction.NONE, ProfilerAction.RECORD_AND_SAVE): [self.prepare_trace, self.start_trace],
- (ProfilerAction.WARMUP, ProfilerAction.NONE): [
- partial(warn, "Incorrect schedule: WARMUP followed by NONE"),
- self.start_trace,
- self.stop_trace],
- (ProfilerAction.WARMUP, ProfilerAction.WARMUP): [],
- (ProfilerAction.WARMUP, ProfilerAction.RECORD): [self.start_trace],
- (ProfilerAction.WARMUP, ProfilerAction.RECORD_AND_SAVE): [self.start_trace],
- (ProfilerAction.RECORD, ProfilerAction.NONE): [
- partial(warn, "Incorrect schedule: RECORD followed by NONE"),
- self.stop_trace],
- (ProfilerAction.RECORD, ProfilerAction.WARMUP): [
- partial(warn, "Incorrect schedule: RECORD followed by WARMUP"),
- self.stop_trace],
- (ProfilerAction.RECORD, ProfilerAction.RECORD): [],
- (ProfilerAction.RECORD, ProfilerAction.RECORD_AND_SAVE): [],
- (ProfilerAction.RECORD_AND_SAVE, ProfilerAction.NONE): [self.stop_trace, self._trace_ready],
- (ProfilerAction.RECORD_AND_SAVE, ProfilerAction.WARMUP): [self.stop_trace, self._trace_ready, self.prepare_trace],
- (ProfilerAction.RECORD_AND_SAVE, ProfilerAction.RECORD): [
- self.stop_trace,
- self._trace_ready,
- self.prepare_trace,
- self.start_trace],
- (ProfilerAction.RECORD_AND_SAVE, ProfilerAction.RECORD_AND_SAVE): [
- self.stop_trace,
- self._trace_ready,
- self.prepare_trace,
- self.start_trace],
- # used for exit action
- (ProfilerAction.WARMUP, None): [self.start_trace, self.stop_trace],
- (ProfilerAction.RECORD, None): [self.stop_trace, self._trace_ready],
- (ProfilerAction.RECORD_AND_SAVE, None): [self.stop_trace, self._trace_ready]
- }
- # Start tracking increments to profiler step, this will be used
- # by Kineto
- prof.KinetoStepTracker.init_step_count(PROFILER_STEP_NAME)
- def __enter__(self):
- self.start()
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.stop()
- prof.KinetoStepTracker.erase_step_count(PROFILER_STEP_NAME)
- def start(self):
- self._transit_action(ProfilerAction.NONE, self.current_action)
- if self.record_steps:
- self.step_rec_fn = prof.record_function("ProfilerStep#" + str(self.step_num))
- self.step_rec_fn.__enter__()
- def stop(self):
- if self.record_steps and self.step_rec_fn:
- self.step_rec_fn.__exit__(None, None, None)
- self._transit_action(self.current_action, None)
- def step(self):
- """
- Signals the profiler that the next profiling step has started.
- """
- if self.record_steps and self.step_rec_fn:
- self.step_rec_fn.__exit__(None, None, None)
- prev_action = self.current_action
- cur_step = self.step_num
- self.step_num += 1
- self.current_action = self.schedule(self.step_num)
- self._transit_action(prev_action, self.current_action)
- prof.KinetoStepTracker.increment_step(PROFILER_STEP_NAME)
- if self.record_steps:
- self.step_rec_fn = prof.record_function("ProfilerStep#" + str(cur_step))
- self.step_rec_fn.__enter__()
- def _trace_ready(self):
- if self.on_trace_ready:
- self.on_trace_ready(self)
- def _transit_action(self, prev_action, current_action):
- action_list = self.action_map.get((prev_action, current_action))
- if action_list:
- for action in action_list:
- action()
- class ExecutionGraphObserver:
- """Execution Graph Observer
- Each process can have a single ExecutionGraphObserver instance. The observer
- can be added to record function callbacks via calling register_callback()
- explicitly. Without calling unregister_callback(), repeated calls to
- register_callback() will not add additional observers to record function
- callbacks. Once an ExecutionGraphObserver is created, the start() and stop()
- methods control when the event data is recorded.
- Deleting or calling unregister_callback() will remove the observer from the
- record function callbacks, finalize the output file, and will stop
- incurring any overheads.
- """
- def __init__(self):
- """
- Initializes the default states.
- """
- self._registered = False
- self._execution_graph_running = False
- def __del__(self):
- """
- Calls unregister_callback() to make sure to finalize outputs.
- """
- self.unregister_callback()
- def register_callback(self, output_file_path: str):
- """
- Adds EG observer to record function callbacks. The the data will be
- written to output_file_path.
- """
- if not self._registered:
- self._output_file_path = output_file_path
- self._registered = _add_execution_graph_observer(output_file_path)
- def unregister_callback(self):
- """
- Removes EG observer from record function callbacks.
- """
- if self._registered:
- self.stop()
- _remove_execution_graph_observer()
- self._registered = False
- @property
- def is_registered(self):
- """
- Return if the execution graph observer is registered.
- """
- return self._registered
- def start(self):
- """
- Starts to capture.
- """
- if self._registered and not self._execution_graph_running:
- _enable_execution_graph_observer()
- self._execution_graph_running = True
- def stop(self):
- """
- Stops to capture.
- """
- if self._execution_graph_running:
- _disable_execution_graph_observer()
- self._execution_graph_running = False
- def get_output_file_path(self) -> str:
- """
- Returns the output file name.
- """
- if self.is_registered:
- return self._output_file_path
- else:
- raise RuntimeError(
- "A callback to the EG profiler needs to be registered "
- "first before getting the output file path"
- )
|