123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225 |
- r"""Importing this file includes common utility methods for checking quantized
- tensors and modules.
- """
- import numpy as np
- import torch
- from contextlib import contextmanager
- from torch.testing._internal.common_utils import TEST_WITH_ASAN, TEST_WITH_TSAN, TEST_WITH_UBSAN, IS_PPC, IS_MACOS, IS_WINDOWS
- supported_qengines = torch.backends.quantized.supported_engines
- supported_qengines.remove('none')
- # Note: We currently do not run QNNPACK tests on WINDOWS and MACOS as it is flaky. Issue #29326
- # QNNPACK is not supported on PPC
- # QNNPACK throws ASAN heap-buffer-overflow error.
- if 'qnnpack' in supported_qengines and any([IS_PPC, TEST_WITH_ASAN, TEST_WITH_TSAN, TEST_WITH_UBSAN, IS_MACOS, IS_WINDOWS]):
- supported_qengines.remove('qnnpack')
- def _conv_output_shape(input_size, kernel_size, padding, stride, dilation,
- output_padding=0):
- """Computes the output shape given convolution parameters."""
- return np.floor((input_size + 2 * padding - kernel_size - (kernel_size - 1)
- * (dilation - 1)) / stride) + 2 * output_padding + 1
- # Quantization references
- def _quantize(x, scale, zero_point, qmin=None, qmax=None, dtype=np.uint8):
- """Quantizes a numpy array."""
- if qmin is None:
- qmin = np.iinfo(dtype).min
- if qmax is None:
- qmax = np.iinfo(dtype).max
- qx = np.round(x / scale + zero_point).astype(np.int64)
- qx = np.clip(qx, qmin, qmax)
- qx = qx.astype(dtype)
- return qx
- def _dequantize(qx, scale, zero_point):
- """Dequantizes a numpy array."""
- x = (qx.astype(float) - zero_point) * scale
- return x
- def _requantize(x, multiplier, zero_point, qmin=0, qmax=255, qtype=np.uint8):
- """Requantizes a numpy array, i.e., intermediate int32 or int16 values are
- converted back to given type"""
- qx = (x * multiplier).round() + zero_point
- qx = np.clip(qx, qmin, qmax).astype(qtype)
- return qx
- def _calculate_dynamic_qparams(X, dtype, reduce_range=False, qscheme=torch.per_tensor_affine):
- """Calculate the dynamic quantization parameters (scale, zero_point)
- according to the min and max element of the tensor"""
- assert qscheme in (torch.per_tensor_affine, torch.per_tensor_symmetric)
- if qscheme == torch.per_tensor_symmetric:
- assert dtype == torch.qint8
- if isinstance(X, torch.Tensor):
- X = X.numpy()
- if dtype == torch.qint8:
- if reduce_range:
- qmin, qmax = -64, 63
- else:
- qmin, qmax = -128, 127
- else: # dtype == torch.quint8
- if reduce_range:
- qmin, qmax = 0, 127
- else:
- qmin, qmax = 0, 255
- min_val = X.min()
- max_val = X.max()
- is_symmetric = (qscheme == torch.per_tensor_symmetric)
- if min_val == max_val:
- scale = 1.0
- zero_point = 0
- else:
- if is_symmetric:
- max_val = max(max_val, -min_val)
- min_val = -max_val
- scale = (max_val - min_val) / (qmax - qmin)
- scale = max(scale, np.finfo(np.float32).eps)
- zero_point = 0
- else:
- max_val = max(max_val, 0.0)
- min_val = min(min_val, 0.0)
- scale = (max_val - min_val) / (qmax - qmin)
- scale = max(scale, np.finfo(np.float32).eps)
- zero_point = qmin - round(min_val / scale)
- zero_point = max(qmin, zero_point)
- zero_point = min(qmax, zero_point)
- return [float(scale), int(zero_point)]
- def _calculate_dynamic_per_channel_qparams(X, dtype):
- """Calculate the dynamic quantization parameters (scale, zero_point)
- according to the min and max element of the tensor"""
- if isinstance(X, torch.Tensor):
- X = X.numpy()
- qmin, qmax = torch.iinfo(dtype).min, torch.iinfo(dtype).max
- n_levels = qmax - qmin
- scale = np.zeros(X.shape[0], dtype=np.float64)
- zero_point = np.zeros(X.shape[0], dtype=np.int64)
- for i in range(zero_point.shape[0]):
- min_val = X.min()
- max_val = X.max()
- if min_val == max_val:
- scale[i] = 1.0
- zero_point[i] = 0
- else:
- max_val = max(max_val, 0.0)
- min_val = min(min_val, 0.0)
- scale[i] = (max_val - min_val) / n_levels
- scale[i] = max(scale[i], np.finfo(np.float32).eps)
- zero_point[i] = qmin - round(min_val / scale[i])
- zero_point[i] = max(qmin, zero_point[i])
- zero_point[i] = min(qmax, zero_point[i])
- return scale, zero_point
- def _snr(x, x_hat):
- """Calculates the signal to noise ratio and returns the signal and noise
- power, as well as the SNR in dB.
- If the input is a list/tuple this function is called recursively on each
- element. The result will have the same nested structure as the inputs.
- Args:
- x, x_hat: Either a tensor or a nested list/tuple of tensors.
- Returns:
- signal, noise, SNR(in dB): Either floats or a nested list of floats
- """
- if isinstance(x, (list, tuple)):
- assert(len(x) == len(x_hat))
- res = []
- for idx in range(len(x)):
- res.append(_snr(x[idx], x_hat[idx]))
- return res
- if x_hat.is_quantized:
- x_hat = x_hat.dequantize()
- if x.is_quantized:
- x = x.dequantize()
- noise = (x - x_hat).norm()
- if noise == 0:
- return 0.0, float('inf'), float('inf')
- signal = x.norm()
- snr = signal / noise
- snr_db = 20 * snr.log10()
- return signal, noise, snr_db
- @contextmanager
- def override_quantized_engine(qengine):
- previous = torch.backends.quantized.engine
- torch.backends.quantized.engine = qengine
- try:
- yield
- finally:
- torch.backends.quantized.engine = previous
- @contextmanager
- def override_cpu_allocator_for_qnnpack(qengine_is_qnnpack):
- try:
- if qengine_is_qnnpack:
- torch._C._set_default_mobile_cpu_allocator()
- yield
- finally:
- if qengine_is_qnnpack:
- torch._C._unset_default_mobile_cpu_allocator()
- # TODO: Update all quantization tests to use this decorator.
- # Currently for some of the tests it seems to have inconsistent params
- # for fbgemm vs qnnpack.
- def override_qengines(qfunction):
- def test_fn(*args, **kwargs):
- for qengine in supported_qengines:
- with override_quantized_engine(qengine):
- # qfunction should not return anything.
- qfunction(*args, **kwargs)
- return test_fn
- def qengine_is_fbgemm():
- return torch.backends.quantized.engine == 'fbgemm'
- def qengine_is_qnnpack():
- return torch.backends.quantized.engine == 'qnnpack'
- def qengine_is_onednn():
- return torch.backends.quantized.engine == 'onednn'
- def qengine_is_x86():
- return torch.backends.quantized.engine == 'x86'
- # Helper function used to simulate per-channel fake-quant against any axis
- def _permute_to_axis_zero(X, axis):
- new_axis_list = list(range(X.dim()))
- new_axis_list[axis] = 0
- new_axis_list[0] = axis
- y = X.permute(tuple(new_axis_list))
- return y, new_axis_list
- # Reference method for fake quantize
- # Note: because scale/zero_point are left as float in the actual kernel, this mimics how fake_quant works for float16/64
- def _fake_quantize_per_channel_affine_reference(X, per_channel_scale, per_channel_zero_point, axis, quant_min, quant_max):
- dtype = X.dtype
- X, permute_axis_list = _permute_to_axis_zero(X.to(torch.float32), axis)
- res = torch.zeros_like(X)
- for i in range(X.size()[0]):
- res[i] = (torch.clamp(torch.round(X[i] * (1.0 / per_channel_scale[i]) +
- per_channel_zero_point[i]), quant_min, quant_max) - per_channel_zero_point[i]) * per_channel_scale[i]
- out = res.permute(tuple(permute_axis_list))
- return out.to(dtype)
- # Reference method for the gradient of the fake quantize operator
- # Note: because scale/zero_point are left as float in the actual kernel, this mimics how fake_quant works for float16/64
- def _fake_quantize_per_channel_affine_grad_reference(dY, X, per_channel_scale, per_channel_zero_point, axis, quant_min, quant_max):
- dtype = X.dtype
- X, permute_axis_list = _permute_to_axis_zero(X.to(torch.float32), axis)
- Xq = torch.zeros_like(X)
- for i in range(X.size()[0]):
- Xq[i] = torch.round(X[i] * (1.0 / per_channel_scale[i]) + per_channel_zero_point[i])
- Xq = Xq.permute(tuple(permute_axis_list))
- mask = (Xq >= quant_min) * (Xq <= quant_max)
- res = torch.zeros_like(dY)
- res[mask] = dY[mask]
- return res.to(dtype)
- def to_tensor(X, device):
- if not isinstance(X, torch.Tensor):
- X = torch.tensor(X)
- else:
- X = X.clone().detach()
- return X.to(device=torch.device(device), dtype=torch.float32)
|