123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- import torch._C
- def format_time(time_us=None, time_ms=None, time_s=None):
- '''Defines how to format time'''
- assert sum([time_us is not None, time_ms is not None, time_s is not None]) == 1
- US_IN_SECOND = 1e6
- US_IN_MS = 1e3
- if time_us is None:
- if time_ms is not None:
- time_us = time_ms * US_IN_MS
- elif time_s is not None:
- time_us = time_s * US_IN_SECOND
- else:
- raise AssertionError("Shouldn't reach here :)")
- if time_us >= US_IN_SECOND:
- return '{:.3f}s'.format(time_us / US_IN_SECOND)
- if time_us >= US_IN_MS:
- return '{:.3f}ms'.format(time_us / US_IN_MS)
- return '{:.3f}us'.format(time_us)
- class ExecutionStats:
- def __init__(self, c_stats, benchmark_config):
- self._c_stats = c_stats
- self.benchmark_config = benchmark_config
- @property
- def latency_avg_ms(self):
- return self._c_stats.latency_avg_ms
- @property
- def num_iters(self):
- return self._c_stats.num_iters
- @property
- def iters_per_second(self):
- '''
- Returns total number of iterations per second across all calling threads
- '''
- return self.num_iters / self.total_time_seconds
- @property
- def total_time_seconds(self):
- return self.num_iters * (
- self.latency_avg_ms / 1000.0) / self.benchmark_config.num_calling_threads
- def __str__(self):
- return '\n'.join([
- "Average latency per example: " + format_time(time_ms=self.latency_avg_ms),
- "Total number of iterations: {}".format(self.num_iters),
- "Total number of iterations per second (across all threads): {:.2f}".format(self.iters_per_second),
- "Total time: " + format_time(time_s=self.total_time_seconds)
- ])
- class ThroughputBenchmark:
- '''
- This class is a wrapper around a c++ component throughput_benchmark::ThroughputBenchmark
- responsible for executing a PyTorch module (nn.Module or ScriptModule)
- under an inference server like load. It can emulate multiple calling threads
- to a single module provided. In the future we plan to enhance this component
- to support inter and intra-op parallelism as well as multiple models
- running in a single process.
- Please note that even though nn.Module is supported, it might incur an overhead
- from the need to hold GIL every time we execute Python code or pass around
- inputs as Python objects. As soon as you have a ScriptModule version of your
- model for inference deployment it is better to switch to using it in this
- benchmark.
- Example::
- >>> # xdoctest: +SKIP("undefined vars")
- >>> from torch.utils import ThroughputBenchmark
- >>> bench = ThroughputBenchmark(my_module)
- >>> # Pre-populate benchmark's data set with the inputs
- >>> for input in inputs:
- ... # Both args and kwargs work, same as any PyTorch Module / ScriptModule
- ... bench.add_input(input[0], x2=input[1])
- >>> # Inputs supplied above are randomly used during the execution
- >>> stats = bench.benchmark(
- ... num_calling_threads=4,
- ... num_warmup_iters = 100,
- ... num_iters = 1000,
- ... )
- >>> print("Avg latency (ms): {}".format(stats.latency_avg_ms))
- >>> print("Number of iterations: {}".format(stats.num_iters))
- '''
- def __init__(self, module):
- if isinstance(module, torch.jit.ScriptModule):
- self._benchmark = torch._C.ThroughputBenchmark(module._c)
- else:
- self._benchmark = torch._C.ThroughputBenchmark(module)
- def run_once(self, *args, **kwargs):
- '''
- Given input id (input_idx) run benchmark once and return prediction.
- This is useful for testing that benchmark actually runs the module you
- want it to run. input_idx here is an index into inputs array populated
- by calling add_input() method.
- '''
- return self._benchmark.run_once(*args, **kwargs)
- def add_input(self, *args, **kwargs):
- '''
- Store a single input to a module into the benchmark memory and keep it
- there. During the benchmark execution every thread is going to pick up a
- random input from the all the inputs ever supplied to the benchmark via
- this function.
- '''
- self._benchmark.add_input(*args, **kwargs)
- def benchmark(
- self,
- num_calling_threads=1,
- num_warmup_iters=10,
- num_iters=100,
- profiler_output_path=""):
- '''
- Args:
- num_warmup_iters (int): Warmup iters are used to make sure we run a module
- a few times before actually measuring things. This way we avoid cold
- caches and any other similar problems. This is the number of warmup
- iterations for each of the thread in separate
- num_iters (int): Number of iterations the benchmark should run with.
- This number is separate from the warmup iterations. Also the number is
- shared across all the threads. Once the num_iters iterations across all
- the threads is reached, we will stop execution. Though total number of
- iterations might be slightly larger. Which is reported as
- stats.num_iters where stats is the result of this function
- profiler_output_path (str): Location to save Autograd Profiler trace.
- If not empty, Autograd Profiler will be enabled for the main benchmark
- execution (but not the warmup phase). The full trace will be saved
- into the file path provided by this argument
- This function returns BenchmarkExecutionStats object which is defined via pybind11.
- It currently has two fields:
- - num_iters - number of actual iterations the benchmark have made
- - avg_latency_ms - average time it took to infer on one input example in milliseconds
- '''
- config = torch._C.BenchmarkConfig()
- config.num_calling_threads = num_calling_threads
- config.num_warmup_iters = num_warmup_iters
- config.num_iters = num_iters
- config.profiler_output_path = profiler_output_path
- c_stats = self._benchmark.benchmark(config)
- return ExecutionStats(c_stats, config)
|