123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958 |
- import inspect
- import math
- import os
- import re
- import numpy as np
- import PIL.Image
- import pytest
- import torch
- from common_utils import assert_close, cache, cpu_and_cuda, needs_cuda, set_rng_seed
- from torch.utils._pytree import tree_map
- from torchvision import tv_tensors
- from torchvision.transforms.functional import _get_perspective_coeffs
- from torchvision.transforms.v2 import functional as F
- from torchvision.transforms.v2._utils import is_pure_tensor
- from torchvision.transforms.v2.functional._geometry import _center_crop_compute_padding
- from torchvision.transforms.v2.functional._meta import clamp_bounding_boxes, convert_bounding_box_format
- from transforms_v2_dispatcher_infos import DISPATCHER_INFOS
- from transforms_v2_kernel_infos import KERNEL_INFOS
- from transforms_v2_legacy_utils import (
- DEFAULT_SQUARE_SPATIAL_SIZE,
- make_multiple_bounding_boxes,
- parametrized_error_message,
- )
- KERNEL_INFOS_MAP = {info.kernel: info for info in KERNEL_INFOS}
- DISPATCHER_INFOS_MAP = {info.dispatcher: info for info in DISPATCHER_INFOS}
- @cache
- def script(fn):
- try:
- return torch.jit.script(fn)
- except Exception as error:
- raise AssertionError(f"Trying to `torch.jit.script` '{fn.__name__}' raised the error above.") from error
- # Scripting a function often triggers a warning like
- # `UserWarning: operator() profile_node %$INT1 : int[] = prim::profile_ivalue($INT2) does not have profile information`
- # with varying `INT1` and `INT2`. Since these are uninteresting for us and only clutter the test summary, we ignore
- # them.
- ignore_jit_warning_no_profile = pytest.mark.filterwarnings(
- f"ignore:{re.escape('operator() profile_node %')}:UserWarning"
- )
- def make_info_args_kwargs_params(info, *, args_kwargs_fn, test_id=None):
- args_kwargs = list(args_kwargs_fn(info))
- if not args_kwargs:
- raise pytest.UsageError(
- f"Couldn't collect a single `ArgsKwargs` for `{info.id}`{f' in {test_id}' if test_id else ''}"
- )
- idx_field_len = len(str(len(args_kwargs)))
- return [
- pytest.param(
- info,
- args_kwargs_,
- marks=info.get_marks(test_id, args_kwargs_) if test_id else [],
- id=f"{info.id}-{idx:0{idx_field_len}}",
- )
- for idx, args_kwargs_ in enumerate(args_kwargs)
- ]
- def make_info_args_kwargs_parametrization(infos, *, args_kwargs_fn):
- def decorator(test_fn):
- parts = test_fn.__qualname__.split(".")
- if len(parts) == 1:
- test_class_name = None
- test_function_name = parts[0]
- elif len(parts) == 2:
- test_class_name, test_function_name = parts
- else:
- raise pytest.UsageError("Unable to parse the test class name and test function name from test function")
- test_id = (test_class_name, test_function_name)
- argnames = ("info", "args_kwargs")
- argvalues = []
- for info in infos:
- argvalues.extend(make_info_args_kwargs_params(info, args_kwargs_fn=args_kwargs_fn, test_id=test_id))
- return pytest.mark.parametrize(argnames, argvalues)(test_fn)
- return decorator
- @pytest.fixture(autouse=True)
- def fix_rng_seed():
- set_rng_seed(0)
- yield
- @pytest.fixture()
- def test_id(request):
- test_class_name = request.cls.__name__ if request.cls is not None else None
- test_function_name = request.node.originalname
- return test_class_name, test_function_name
- class TestKernels:
- sample_inputs = make_info_args_kwargs_parametrization(
- KERNEL_INFOS,
- args_kwargs_fn=lambda kernel_info: kernel_info.sample_inputs_fn(),
- )
- reference_inputs = make_info_args_kwargs_parametrization(
- [info for info in KERNEL_INFOS if info.reference_fn is not None],
- args_kwargs_fn=lambda info: info.reference_inputs_fn(),
- )
- @make_info_args_kwargs_parametrization(
- [info for info in KERNEL_INFOS if info.logs_usage],
- args_kwargs_fn=lambda info: info.sample_inputs_fn(),
- )
- @pytest.mark.parametrize("device", cpu_and_cuda())
- def test_logging(self, spy_on, info, args_kwargs, device):
- spy = spy_on(torch._C._log_api_usage_once)
- (input, *other_args), kwargs = args_kwargs.load(device)
- info.kernel(input.as_subclass(torch.Tensor), *other_args, **kwargs)
- spy.assert_any_call(f"{info.kernel.__module__}.{info.id}")
- @ignore_jit_warning_no_profile
- @sample_inputs
- @pytest.mark.parametrize("device", cpu_and_cuda())
- def test_scripted_vs_eager(self, test_id, info, args_kwargs, device):
- kernel_eager = info.kernel
- kernel_scripted = script(kernel_eager)
- (input, *other_args), kwargs = args_kwargs.load(device)
- input = input.as_subclass(torch.Tensor)
- actual = kernel_scripted(input, *other_args, **kwargs)
- expected = kernel_eager(input, *other_args, **kwargs)
- assert_close(
- actual,
- expected,
- **info.get_closeness_kwargs(test_id, dtype=input.dtype, device=input.device),
- msg=parametrized_error_message(input, other_args, **kwargs),
- )
- def _unbatch(self, batch, *, data_dims):
- if isinstance(batch, torch.Tensor):
- batched_tensor = batch
- metadata = ()
- else:
- batched_tensor, *metadata = batch
- if batched_tensor.ndim == data_dims:
- return batch
- return [
- self._unbatch(unbatched, data_dims=data_dims)
- for unbatched in (
- batched_tensor.unbind(0) if not metadata else [(t, *metadata) for t in batched_tensor.unbind(0)]
- )
- ]
- @sample_inputs
- @pytest.mark.parametrize("device", cpu_and_cuda())
- def test_batched_vs_single(self, test_id, info, args_kwargs, device):
- (batched_input, *other_args), kwargs = args_kwargs.load(device)
- tv_tensor_type = tv_tensors.Image if is_pure_tensor(batched_input) else type(batched_input)
- # This dictionary contains the number of rightmost dimensions that contain the actual data.
- # Everything to the left is considered a batch dimension.
- data_dims = {
- tv_tensors.Image: 3,
- tv_tensors.BoundingBoxes: 1,
- # `Mask`'s are special in the sense that the data dimensions depend on the type of mask. For detection masks
- # it is 3 `(*, N, H, W)`, but for segmentation masks it is 2 `(*, H, W)`. Since both a grouped under one
- # type all kernels should also work without differentiating between the two. Thus, we go with 2 here as
- # common ground.
- tv_tensors.Mask: 2,
- tv_tensors.Video: 4,
- }.get(tv_tensor_type)
- if data_dims is None:
- raise pytest.UsageError(
- f"The number of data dimensions cannot be determined for input of type {tv_tensor_type.__name__}."
- ) from None
- elif batched_input.ndim <= data_dims:
- pytest.skip("Input is not batched.")
- elif not all(batched_input.shape[:-data_dims]):
- pytest.skip("Input has a degenerate batch shape.")
- batched_input = batched_input.as_subclass(torch.Tensor)
- batched_output = info.kernel(batched_input, *other_args, **kwargs)
- actual = self._unbatch(batched_output, data_dims=data_dims)
- single_inputs = self._unbatch(batched_input, data_dims=data_dims)
- expected = tree_map(lambda single_input: info.kernel(single_input, *other_args, **kwargs), single_inputs)
- assert_close(
- actual,
- expected,
- **info.get_closeness_kwargs(test_id, dtype=batched_input.dtype, device=batched_input.device),
- msg=parametrized_error_message(batched_input, *other_args, **kwargs),
- )
- @sample_inputs
- @pytest.mark.parametrize("device", cpu_and_cuda())
- def test_no_inplace(self, info, args_kwargs, device):
- (input, *other_args), kwargs = args_kwargs.load(device)
- input = input.as_subclass(torch.Tensor)
- if input.numel() == 0:
- pytest.skip("The input has a degenerate shape.")
- input_version = input._version
- info.kernel(input, *other_args, **kwargs)
- assert input._version == input_version
- @sample_inputs
- @needs_cuda
- def test_cuda_vs_cpu(self, test_id, info, args_kwargs):
- (input_cpu, *other_args), kwargs = args_kwargs.load("cpu")
- input_cpu = input_cpu.as_subclass(torch.Tensor)
- input_cuda = input_cpu.to("cuda")
- output_cpu = info.kernel(input_cpu, *other_args, **kwargs)
- output_cuda = info.kernel(input_cuda, *other_args, **kwargs)
- assert_close(
- output_cuda,
- output_cpu,
- check_device=False,
- **info.get_closeness_kwargs(test_id, dtype=input_cuda.dtype, device=input_cuda.device),
- msg=parametrized_error_message(input_cpu, *other_args, **kwargs),
- )
- @sample_inputs
- @pytest.mark.parametrize("device", cpu_and_cuda())
- def test_dtype_and_device_consistency(self, info, args_kwargs, device):
- (input, *other_args), kwargs = args_kwargs.load(device)
- input = input.as_subclass(torch.Tensor)
- output = info.kernel(input, *other_args, **kwargs)
- # Most kernels just return a tensor, but some also return some additional metadata
- if not isinstance(output, torch.Tensor):
- output, *_ = output
- assert output.dtype == input.dtype
- assert output.device == input.device
- @reference_inputs
- def test_against_reference(self, test_id, info, args_kwargs):
- (input, *other_args), kwargs = args_kwargs.load("cpu")
- actual = info.kernel(input.as_subclass(torch.Tensor), *other_args, **kwargs)
- # We intnetionally don't unwrap the input of the reference function in order for it to have access to all
- # metadata regardless of whether the kernel takes it explicitly or not
- expected = info.reference_fn(input, *other_args, **kwargs)
- assert_close(
- actual,
- expected,
- **info.get_closeness_kwargs(test_id, dtype=input.dtype, device=input.device),
- msg=parametrized_error_message(input, *other_args, **kwargs),
- )
- @make_info_args_kwargs_parametrization(
- [info for info in KERNEL_INFOS if info.float32_vs_uint8],
- args_kwargs_fn=lambda info: info.reference_inputs_fn(),
- )
- def test_float32_vs_uint8(self, test_id, info, args_kwargs):
- (input, *other_args), kwargs = args_kwargs.load("cpu")
- input = input.as_subclass(torch.Tensor)
- if input.dtype != torch.uint8:
- pytest.skip(f"Input dtype is {input.dtype}.")
- adapted_other_args, adapted_kwargs = info.float32_vs_uint8(other_args, kwargs)
- actual = info.kernel(
- F.to_dtype_image(input, dtype=torch.float32, scale=True),
- *adapted_other_args,
- **adapted_kwargs,
- )
- expected = F.to_dtype_image(info.kernel(input, *other_args, **kwargs), dtype=torch.float32, scale=True)
- assert_close(
- actual,
- expected,
- **info.get_closeness_kwargs(test_id, dtype=torch.float32, device=input.device),
- msg=parametrized_error_message(input, *other_args, **kwargs),
- )
- @pytest.fixture
- def spy_on(mocker):
- def make_spy(fn, *, module=None, name=None):
- # TODO: we can probably get rid of the non-default modules and names if we eliminate aliasing
- module = module or fn.__module__
- name = name or fn.__name__
- spy = mocker.patch(f"{module}.{name}", wraps=fn)
- return spy
- return make_spy
- class TestDispatchers:
- image_sample_inputs = make_info_args_kwargs_parametrization(
- [info for info in DISPATCHER_INFOS if tv_tensors.Image in info.kernels],
- args_kwargs_fn=lambda info: info.sample_inputs(tv_tensors.Image),
- )
- @make_info_args_kwargs_parametrization(
- DISPATCHER_INFOS,
- args_kwargs_fn=lambda info: info.sample_inputs(),
- )
- @pytest.mark.parametrize("device", cpu_and_cuda())
- def test_logging(self, spy_on, info, args_kwargs, device):
- spy = spy_on(torch._C._log_api_usage_once)
- args, kwargs = args_kwargs.load(device)
- info.dispatcher(*args, **kwargs)
- spy.assert_any_call(f"{info.dispatcher.__module__}.{info.id}")
- @ignore_jit_warning_no_profile
- @image_sample_inputs
- @pytest.mark.parametrize("device", cpu_and_cuda())
- def test_scripted_smoke(self, info, args_kwargs, device):
- dispatcher = script(info.dispatcher)
- (image_tv_tensor, *other_args), kwargs = args_kwargs.load(device)
- image_pure_tensor = torch.Tensor(image_tv_tensor)
- dispatcher(image_pure_tensor, *other_args, **kwargs)
- # TODO: We need this until the dispatchers below also have `DispatcherInfo`'s. If they do, `test_scripted_smoke`
- # replaces this test for them.
- @ignore_jit_warning_no_profile
- @pytest.mark.parametrize(
- "dispatcher",
- [
- F.get_dimensions,
- F.get_image_num_channels,
- F.get_image_size,
- F.get_num_channels,
- F.get_num_frames,
- F.get_size,
- F.rgb_to_grayscale,
- F.uniform_temporal_subsample,
- ],
- ids=lambda dispatcher: dispatcher.__name__,
- )
- def test_scriptable(self, dispatcher):
- script(dispatcher)
- @image_sample_inputs
- def test_pure_tensor_output_type(self, info, args_kwargs):
- (image_tv_tensor, *other_args), kwargs = args_kwargs.load()
- image_pure_tensor = image_tv_tensor.as_subclass(torch.Tensor)
- output = info.dispatcher(image_pure_tensor, *other_args, **kwargs)
- # We cannot use `isinstance` here since all tv_tensors are instances of `torch.Tensor` as well
- assert type(output) is torch.Tensor
- @make_info_args_kwargs_parametrization(
- [info for info in DISPATCHER_INFOS if info.pil_kernel_info is not None],
- args_kwargs_fn=lambda info: info.sample_inputs(tv_tensors.Image),
- )
- def test_pil_output_type(self, info, args_kwargs):
- (image_tv_tensor, *other_args), kwargs = args_kwargs.load()
- if image_tv_tensor.ndim > 3:
- pytest.skip("Input is batched")
- image_pil = F.to_pil_image(image_tv_tensor)
- output = info.dispatcher(image_pil, *other_args, **kwargs)
- assert isinstance(output, PIL.Image.Image)
- @make_info_args_kwargs_parametrization(
- DISPATCHER_INFOS,
- args_kwargs_fn=lambda info: info.sample_inputs(),
- )
- def test_tv_tensor_output_type(self, info, args_kwargs):
- (tv_tensor, *other_args), kwargs = args_kwargs.load()
- output = info.dispatcher(tv_tensor, *other_args, **kwargs)
- assert isinstance(output, type(tv_tensor))
- if isinstance(tv_tensor, tv_tensors.BoundingBoxes) and info.dispatcher is not F.convert_bounding_box_format:
- assert output.format == tv_tensor.format
- @pytest.mark.parametrize(
- ("dispatcher_info", "tv_tensor_type", "kernel_info"),
- [
- pytest.param(
- dispatcher_info, tv_tensor_type, kernel_info, id=f"{dispatcher_info.id}-{tv_tensor_type.__name__}"
- )
- for dispatcher_info in DISPATCHER_INFOS
- for tv_tensor_type, kernel_info in dispatcher_info.kernel_infos.items()
- ],
- )
- def test_dispatcher_kernel_signatures_consistency(self, dispatcher_info, tv_tensor_type, kernel_info):
- dispatcher_signature = inspect.signature(dispatcher_info.dispatcher)
- dispatcher_params = list(dispatcher_signature.parameters.values())[1:]
- kernel_signature = inspect.signature(kernel_info.kernel)
- kernel_params = list(kernel_signature.parameters.values())[1:]
- # We filter out metadata that is implicitly passed to the dispatcher through the input tv_tensor, but has to be
- # explicitly passed to the kernel.
- input_type = {v: k for k, v in dispatcher_info.kernels.items()}.get(kernel_info.kernel)
- explicit_metadata = {
- tv_tensors.BoundingBoxes: {"format", "canvas_size"},
- }
- kernel_params = [param for param in kernel_params if param.name not in explicit_metadata.get(input_type, set())]
- dispatcher_params = iter(dispatcher_params)
- for dispatcher_param, kernel_param in zip(dispatcher_params, kernel_params):
- try:
- # In general, the dispatcher parameters are a superset of the kernel parameters. Thus, we filter out
- # dispatcher parameters that have no kernel equivalent while keeping the order intact.
- while dispatcher_param.name != kernel_param.name:
- dispatcher_param = next(dispatcher_params)
- except StopIteration:
- raise AssertionError(
- f"Parameter `{kernel_param.name}` of kernel `{kernel_info.id}` "
- f"has no corresponding parameter on the dispatcher `{dispatcher_info.id}`."
- ) from None
- assert dispatcher_param == kernel_param
- @pytest.mark.parametrize("info", DISPATCHER_INFOS, ids=lambda info: info.id)
- def test_unkown_type(self, info):
- unkown_input = object()
- (_, *other_args), kwargs = next(iter(info.sample_inputs())).load("cpu")
- with pytest.raises(TypeError, match=re.escape(str(type(unkown_input)))):
- info.dispatcher(unkown_input, *other_args, **kwargs)
- @make_info_args_kwargs_parametrization(
- [
- info
- for info in DISPATCHER_INFOS
- if tv_tensors.BoundingBoxes in info.kernels and info.dispatcher is not F.convert_bounding_box_format
- ],
- args_kwargs_fn=lambda info: info.sample_inputs(tv_tensors.BoundingBoxes),
- )
- def test_bounding_boxes_format_consistency(self, info, args_kwargs):
- (bounding_boxes, *other_args), kwargs = args_kwargs.load()
- format = bounding_boxes.format
- output = info.dispatcher(bounding_boxes, *other_args, **kwargs)
- assert output.format == format
- @pytest.mark.parametrize(
- ("alias", "target"),
- [
- pytest.param(alias, target, id=alias.__name__)
- for alias, target in [
- (F.hflip, F.horizontal_flip),
- (F.vflip, F.vertical_flip),
- (F.get_image_num_channels, F.get_num_channels),
- (F.to_pil_image, F.to_pil_image),
- (F.elastic_transform, F.elastic),
- (F.to_grayscale, F.rgb_to_grayscale),
- ]
- ],
- )
- def test_alias(alias, target):
- assert alias is target
- @pytest.mark.parametrize("device", cpu_and_cuda())
- @pytest.mark.parametrize("num_channels", [1, 3])
- def test_normalize_image_tensor_stats(device, num_channels):
- stats = pytest.importorskip("scipy.stats", reason="SciPy is not available")
- def assert_samples_from_standard_normal(t):
- p_value = stats.kstest(t.flatten(), cdf="norm", args=(0, 1)).pvalue
- return p_value > 1e-4
- image = torch.rand(num_channels, DEFAULT_SQUARE_SPATIAL_SIZE, DEFAULT_SQUARE_SPATIAL_SIZE)
- mean = image.mean(dim=(1, 2)).tolist()
- std = image.std(dim=(1, 2)).tolist()
- assert_samples_from_standard_normal(F.normalize_image(image, mean, std))
- class TestClampBoundingBoxes:
- @pytest.mark.parametrize(
- "metadata",
- [
- dict(),
- dict(format=tv_tensors.BoundingBoxFormat.XYXY),
- dict(canvas_size=(1, 1)),
- ],
- )
- def test_pure_tensor_insufficient_metadata(self, metadata):
- pure_tensor = next(make_multiple_bounding_boxes()).as_subclass(torch.Tensor)
- with pytest.raises(ValueError, match=re.escape("`format` and `canvas_size` has to be passed")):
- F.clamp_bounding_boxes(pure_tensor, **metadata)
- @pytest.mark.parametrize(
- "metadata",
- [
- dict(format=tv_tensors.BoundingBoxFormat.XYXY),
- dict(canvas_size=(1, 1)),
- dict(format=tv_tensors.BoundingBoxFormat.XYXY, canvas_size=(1, 1)),
- ],
- )
- def test_tv_tensor_explicit_metadata(self, metadata):
- tv_tensor = next(make_multiple_bounding_boxes())
- with pytest.raises(ValueError, match=re.escape("`format` and `canvas_size` must not be passed")):
- F.clamp_bounding_boxes(tv_tensor, **metadata)
- class TestConvertFormatBoundingBoxes:
- @pytest.mark.parametrize(
- ("inpt", "old_format"),
- [
- (next(make_multiple_bounding_boxes()), None),
- (next(make_multiple_bounding_boxes()).as_subclass(torch.Tensor), tv_tensors.BoundingBoxFormat.XYXY),
- ],
- )
- def test_missing_new_format(self, inpt, old_format):
- with pytest.raises(TypeError, match=re.escape("missing 1 required argument: 'new_format'")):
- F.convert_bounding_box_format(inpt, old_format)
- def test_pure_tensor_insufficient_metadata(self):
- pure_tensor = next(make_multiple_bounding_boxes()).as_subclass(torch.Tensor)
- with pytest.raises(ValueError, match=re.escape("`old_format` has to be passed")):
- F.convert_bounding_box_format(pure_tensor, new_format=tv_tensors.BoundingBoxFormat.CXCYWH)
- def test_tv_tensor_explicit_metadata(self):
- tv_tensor = next(make_multiple_bounding_boxes())
- with pytest.raises(ValueError, match=re.escape("`old_format` must not be passed")):
- F.convert_bounding_box_format(
- tv_tensor, old_format=tv_tensor.format, new_format=tv_tensors.BoundingBoxFormat.CXCYWH
- )
- # TODO: All correctness checks below this line should be ported to be references on a `KernelInfo` in
- # `transforms_v2_kernel_infos.py`
- def _compute_affine_matrix(angle_, translate_, scale_, shear_, center_):
- rot = math.radians(angle_)
- cx, cy = center_
- tx, ty = translate_
- sx, sy = [math.radians(sh_) for sh_ in shear_]
- c_matrix = np.array([[1, 0, cx], [0, 1, cy], [0, 0, 1]])
- t_matrix = np.array([[1, 0, tx], [0, 1, ty], [0, 0, 1]])
- c_matrix_inv = np.linalg.inv(c_matrix)
- rs_matrix = np.array(
- [
- [scale_ * math.cos(rot), -scale_ * math.sin(rot), 0],
- [scale_ * math.sin(rot), scale_ * math.cos(rot), 0],
- [0, 0, 1],
- ]
- )
- shear_x_matrix = np.array([[1, -math.tan(sx), 0], [0, 1, 0], [0, 0, 1]])
- shear_y_matrix = np.array([[1, 0, 0], [-math.tan(sy), 1, 0], [0, 0, 1]])
- rss_matrix = np.matmul(rs_matrix, np.matmul(shear_y_matrix, shear_x_matrix))
- true_matrix = np.matmul(t_matrix, np.matmul(c_matrix, np.matmul(rss_matrix, c_matrix_inv)))
- return true_matrix
- @pytest.mark.parametrize("device", cpu_and_cuda())
- def test_correctness_vertical_flip_segmentation_mask_on_fixed_input(device):
- mask = torch.zeros((3, 3, 3), dtype=torch.long, device=device)
- mask[:, 0, :] = 1
- out_mask = F.vertical_flip_mask(mask)
- expected_mask = torch.zeros((3, 3, 3), dtype=torch.long, device=device)
- expected_mask[:, -1, :] = 1
- torch.testing.assert_close(out_mask, expected_mask)
- @pytest.mark.parametrize("device", cpu_and_cuda())
- @pytest.mark.parametrize(
- "format",
- [tv_tensors.BoundingBoxFormat.XYXY, tv_tensors.BoundingBoxFormat.XYWH, tv_tensors.BoundingBoxFormat.CXCYWH],
- )
- @pytest.mark.parametrize(
- "top, left, height, width, size",
- [
- [0, 0, 30, 30, (60, 60)],
- [-5, 5, 35, 45, (32, 34)],
- ],
- )
- def test_correctness_resized_crop_bounding_boxes(device, format, top, left, height, width, size):
- def _compute_expected_bbox(bbox, top_, left_, height_, width_, size_):
- # bbox should be xyxy
- bbox[0] = (bbox[0] - left_) * size_[1] / width_
- bbox[1] = (bbox[1] - top_) * size_[0] / height_
- bbox[2] = (bbox[2] - left_) * size_[1] / width_
- bbox[3] = (bbox[3] - top_) * size_[0] / height_
- return bbox
- format = tv_tensors.BoundingBoxFormat.XYXY
- canvas_size = (100, 100)
- in_boxes = [
- [10.0, 10.0, 20.0, 20.0],
- [5.0, 10.0, 15.0, 20.0],
- ]
- expected_bboxes = []
- for in_box in in_boxes:
- expected_bboxes.append(_compute_expected_bbox(list(in_box), top, left, height, width, size))
- expected_bboxes = torch.tensor(expected_bboxes, device=device)
- in_boxes = tv_tensors.BoundingBoxes(
- in_boxes, format=tv_tensors.BoundingBoxFormat.XYXY, canvas_size=canvas_size, device=device
- )
- if format != tv_tensors.BoundingBoxFormat.XYXY:
- in_boxes = convert_bounding_box_format(in_boxes, tv_tensors.BoundingBoxFormat.XYXY, format)
- output_boxes, output_canvas_size = F.resized_crop_bounding_boxes(in_boxes, format, top, left, height, width, size)
- if format != tv_tensors.BoundingBoxFormat.XYXY:
- output_boxes = convert_bounding_box_format(output_boxes, format, tv_tensors.BoundingBoxFormat.XYXY)
- torch.testing.assert_close(output_boxes, expected_bboxes)
- torch.testing.assert_close(output_canvas_size, size)
- def _parse_padding(padding):
- if isinstance(padding, int):
- return [padding] * 4
- if isinstance(padding, list):
- if len(padding) == 1:
- return padding * 4
- if len(padding) == 2:
- return padding * 2 # [left, up, right, down]
- return padding
- @pytest.mark.parametrize("device", cpu_and_cuda())
- @pytest.mark.parametrize("padding", [[1], [1, 1], [1, 1, 2, 2]])
- def test_correctness_pad_bounding_boxes(device, padding):
- def _compute_expected_bbox(bbox, format, padding_):
- pad_left, pad_up, _, _ = _parse_padding(padding_)
- dtype = bbox.dtype
- bbox = (
- bbox.clone()
- if format == tv_tensors.BoundingBoxFormat.XYXY
- else convert_bounding_box_format(bbox, old_format=format, new_format=tv_tensors.BoundingBoxFormat.XYXY)
- )
- bbox[0::2] += pad_left
- bbox[1::2] += pad_up
- bbox = convert_bounding_box_format(bbox, old_format=tv_tensors.BoundingBoxFormat.XYXY, new_format=format)
- if bbox.dtype != dtype:
- # Temporary cast to original dtype
- # e.g. float32 -> int
- bbox = bbox.to(dtype)
- return bbox
- def _compute_expected_canvas_size(bbox, padding_):
- pad_left, pad_up, pad_right, pad_down = _parse_padding(padding_)
- height, width = bbox.canvas_size
- return height + pad_up + pad_down, width + pad_left + pad_right
- for bboxes in make_multiple_bounding_boxes(extra_dims=((4,),)):
- bboxes = bboxes.to(device)
- bboxes_format = bboxes.format
- bboxes_canvas_size = bboxes.canvas_size
- output_boxes, output_canvas_size = F.pad_bounding_boxes(
- bboxes, format=bboxes_format, canvas_size=bboxes_canvas_size, padding=padding
- )
- torch.testing.assert_close(output_canvas_size, _compute_expected_canvas_size(bboxes, padding))
- expected_bboxes = torch.stack(
- [_compute_expected_bbox(b, bboxes_format, padding) for b in bboxes.reshape(-1, 4).unbind()]
- ).reshape(bboxes.shape)
- torch.testing.assert_close(output_boxes, expected_bboxes, atol=1, rtol=0)
- @pytest.mark.parametrize("device", cpu_and_cuda())
- def test_correctness_pad_segmentation_mask_on_fixed_input(device):
- mask = torch.ones((1, 3, 3), dtype=torch.long, device=device)
- out_mask = F.pad_mask(mask, padding=[1, 1, 1, 1])
- expected_mask = torch.zeros((1, 5, 5), dtype=torch.long, device=device)
- expected_mask[:, 1:-1, 1:-1] = 1
- torch.testing.assert_close(out_mask, expected_mask)
- @pytest.mark.parametrize("device", cpu_and_cuda())
- @pytest.mark.parametrize(
- "startpoints, endpoints",
- [
- [[[0, 0], [33, 0], [33, 25], [0, 25]], [[3, 2], [32, 3], [30, 24], [2, 25]]],
- [[[3, 2], [32, 3], [30, 24], [2, 25]], [[0, 0], [33, 0], [33, 25], [0, 25]]],
- [[[3, 2], [32, 3], [30, 24], [2, 25]], [[5, 5], [30, 3], [33, 19], [4, 25]]],
- ],
- )
- def test_correctness_perspective_bounding_boxes(device, startpoints, endpoints):
- def _compute_expected_bbox(bbox, format_, canvas_size_, pcoeffs_):
- m1 = np.array(
- [
- [pcoeffs_[0], pcoeffs_[1], pcoeffs_[2]],
- [pcoeffs_[3], pcoeffs_[4], pcoeffs_[5]],
- ]
- )
- m2 = np.array(
- [
- [pcoeffs_[6], pcoeffs_[7], 1.0],
- [pcoeffs_[6], pcoeffs_[7], 1.0],
- ]
- )
- bbox_xyxy = convert_bounding_box_format(bbox, old_format=format_, new_format=tv_tensors.BoundingBoxFormat.XYXY)
- points = np.array(
- [
- [bbox_xyxy[0].item(), bbox_xyxy[1].item(), 1.0],
- [bbox_xyxy[2].item(), bbox_xyxy[1].item(), 1.0],
- [bbox_xyxy[0].item(), bbox_xyxy[3].item(), 1.0],
- [bbox_xyxy[2].item(), bbox_xyxy[3].item(), 1.0],
- ]
- )
- numer = np.matmul(points, m1.T)
- denom = np.matmul(points, m2.T)
- transformed_points = numer / denom
- out_bbox = np.array(
- [
- np.min(transformed_points[:, 0]),
- np.min(transformed_points[:, 1]),
- np.max(transformed_points[:, 0]),
- np.max(transformed_points[:, 1]),
- ]
- )
- out_bbox = torch.from_numpy(out_bbox)
- out_bbox = convert_bounding_box_format(
- out_bbox, old_format=tv_tensors.BoundingBoxFormat.XYXY, new_format=format_
- )
- return clamp_bounding_boxes(out_bbox, format=format_, canvas_size=canvas_size_).to(bbox)
- canvas_size = (32, 38)
- pcoeffs = _get_perspective_coeffs(startpoints, endpoints)
- inv_pcoeffs = _get_perspective_coeffs(endpoints, startpoints)
- for bboxes in make_multiple_bounding_boxes(spatial_size=canvas_size, extra_dims=((4,),)):
- bboxes = bboxes.to(device)
- output_bboxes = F.perspective_bounding_boxes(
- bboxes.as_subclass(torch.Tensor),
- format=bboxes.format,
- canvas_size=bboxes.canvas_size,
- startpoints=None,
- endpoints=None,
- coefficients=pcoeffs,
- )
- expected_bboxes = torch.stack(
- [
- _compute_expected_bbox(b, bboxes.format, bboxes.canvas_size, inv_pcoeffs)
- for b in bboxes.reshape(-1, 4).unbind()
- ]
- ).reshape(bboxes.shape)
- torch.testing.assert_close(output_bboxes, expected_bboxes, rtol=0, atol=1)
- @pytest.mark.parametrize("device", cpu_and_cuda())
- @pytest.mark.parametrize(
- "output_size",
- [(18, 18), [18, 15], (16, 19), [12], [46, 48]],
- )
- def test_correctness_center_crop_bounding_boxes(device, output_size):
- def _compute_expected_bbox(bbox, format_, canvas_size_, output_size_):
- dtype = bbox.dtype
- bbox = convert_bounding_box_format(bbox.float(), format_, tv_tensors.BoundingBoxFormat.XYWH)
- if len(output_size_) == 1:
- output_size_.append(output_size_[-1])
- cy = int(round((canvas_size_[0] - output_size_[0]) * 0.5))
- cx = int(round((canvas_size_[1] - output_size_[1]) * 0.5))
- out_bbox = [
- bbox[0].item() - cx,
- bbox[1].item() - cy,
- bbox[2].item(),
- bbox[3].item(),
- ]
- out_bbox = torch.tensor(out_bbox)
- out_bbox = convert_bounding_box_format(out_bbox, tv_tensors.BoundingBoxFormat.XYWH, format_)
- out_bbox = clamp_bounding_boxes(out_bbox, format=format_, canvas_size=output_size)
- return out_bbox.to(dtype=dtype, device=bbox.device)
- for bboxes in make_multiple_bounding_boxes(extra_dims=((4,),)):
- bboxes = bboxes.to(device)
- bboxes_format = bboxes.format
- bboxes_canvas_size = bboxes.canvas_size
- output_boxes, output_canvas_size = F.center_crop_bounding_boxes(
- bboxes, bboxes_format, bboxes_canvas_size, output_size
- )
- expected_bboxes = torch.stack(
- [
- _compute_expected_bbox(b, bboxes_format, bboxes_canvas_size, output_size)
- for b in bboxes.reshape(-1, 4).unbind()
- ]
- ).reshape(bboxes.shape)
- torch.testing.assert_close(output_boxes, expected_bboxes, atol=1, rtol=0)
- torch.testing.assert_close(output_canvas_size, output_size)
- @pytest.mark.parametrize("device", cpu_and_cuda())
- @pytest.mark.parametrize("output_size", [[4, 2], [4], [7, 6]])
- def test_correctness_center_crop_mask(device, output_size):
- def _compute_expected_mask(mask, output_size):
- crop_height, crop_width = output_size if len(output_size) > 1 else [output_size[0], output_size[0]]
- _, image_height, image_width = mask.shape
- if crop_width > image_height or crop_height > image_width:
- padding = _center_crop_compute_padding(crop_height, crop_width, image_height, image_width)
- mask = F.pad_image(mask, padding, fill=0)
- left = round((image_width - crop_width) * 0.5)
- top = round((image_height - crop_height) * 0.5)
- return mask[:, top : top + crop_height, left : left + crop_width]
- mask = torch.randint(0, 2, size=(1, 6, 6), dtype=torch.long, device=device)
- actual = F.center_crop_mask(mask, output_size)
- expected = _compute_expected_mask(mask, output_size)
- torch.testing.assert_close(expected, actual)
- # Copied from test/test_functional_tensor.py
- @pytest.mark.parametrize("device", cpu_and_cuda())
- @pytest.mark.parametrize("canvas_size", ("small", "large"))
- @pytest.mark.parametrize("dt", [None, torch.float32, torch.float64, torch.float16])
- @pytest.mark.parametrize("ksize", [(3, 3), [3, 5], (23, 23)])
- @pytest.mark.parametrize("sigma", [[0.5, 0.5], (0.5, 0.5), (0.8, 0.8), (1.7, 1.7)])
- def test_correctness_gaussian_blur_image_tensor(device, canvas_size, dt, ksize, sigma):
- fn = F.gaussian_blur_image
- # true_cv2_results = {
- # # np_img = np.arange(3 * 10 * 12, dtype="uint8").reshape((10, 12, 3))
- # # cv2.GaussianBlur(np_img, ksize=(3, 3), sigmaX=0.8)
- # "3_3_0.8": ...
- # # cv2.GaussianBlur(np_img, ksize=(3, 3), sigmaX=0.5)
- # "3_3_0.5": ...
- # # cv2.GaussianBlur(np_img, ksize=(3, 5), sigmaX=0.8)
- # "3_5_0.8": ...
- # # cv2.GaussianBlur(np_img, ksize=(3, 5), sigmaX=0.5)
- # "3_5_0.5": ...
- # # np_img2 = np.arange(26 * 28, dtype="uint8").reshape((26, 28))
- # # cv2.GaussianBlur(np_img2, ksize=(23, 23), sigmaX=1.7)
- # "23_23_1.7": ...
- # }
- p = os.path.join(os.path.dirname(os.path.abspath(__file__)), "assets", "gaussian_blur_opencv_results.pt")
- true_cv2_results = torch.load(p)
- if canvas_size == "small":
- tensor = (
- torch.from_numpy(np.arange(3 * 10 * 12, dtype="uint8").reshape((10, 12, 3))).permute(2, 0, 1).to(device)
- )
- else:
- tensor = torch.from_numpy(np.arange(26 * 28, dtype="uint8").reshape((1, 26, 28))).to(device)
- if dt == torch.float16 and device == "cpu":
- # skip float16 on CPU case
- return
- if dt is not None:
- tensor = tensor.to(dtype=dt)
- _ksize = (ksize, ksize) if isinstance(ksize, int) else ksize
- _sigma = sigma[0] if sigma is not None else None
- shape = tensor.shape
- gt_key = f"{shape[-2]}_{shape[-1]}_{shape[-3]}__{_ksize[0]}_{_ksize[1]}_{_sigma}"
- if gt_key not in true_cv2_results:
- return
- true_out = (
- torch.tensor(true_cv2_results[gt_key]).reshape(shape[-2], shape[-1], shape[-3]).permute(2, 0, 1).to(tensor)
- )
- image = tv_tensors.Image(tensor)
- out = fn(image, kernel_size=ksize, sigma=sigma)
- torch.testing.assert_close(out, true_out, rtol=0.0, atol=1.0, msg=f"{ksize}, {sigma}")
- @pytest.mark.parametrize(
- "inpt",
- [
- 127 * np.ones((32, 32, 3), dtype="uint8"),
- PIL.Image.new("RGB", (32, 32), 122),
- ],
- )
- def test_to_image(inpt):
- output = F.to_image(inpt)
- assert isinstance(output, torch.Tensor)
- assert output.shape == (3, 32, 32)
- assert np.asarray(inpt).sum() == output.sum().item()
- @pytest.mark.parametrize(
- "inpt",
- [
- torch.randint(0, 256, size=(3, 32, 32), dtype=torch.uint8),
- 127 * np.ones((32, 32, 3), dtype="uint8"),
- ],
- )
- @pytest.mark.parametrize("mode", [None, "RGB"])
- def test_to_pil_image(inpt, mode):
- output = F.to_pil_image(inpt, mode=mode)
- assert isinstance(output, PIL.Image.Image)
- assert np.asarray(inpt).sum() == np.asarray(output).sum()
- def test_equalize_image_tensor_edge_cases():
- inpt = torch.zeros(3, 200, 200, dtype=torch.uint8)
- output = F.equalize_image(inpt)
- torch.testing.assert_close(inpt, output)
- inpt = torch.zeros(5, 3, 200, 200, dtype=torch.uint8)
- inpt[..., 100:, 100:] = 1
- output = F.equalize_image(inpt)
- assert output.unique().tolist() == [0, 255]
- @pytest.mark.parametrize("device", cpu_and_cuda())
- def test_correctness_uniform_temporal_subsample(device):
- video = torch.arange(10, device=device)[:, None, None, None].expand(-1, 3, 8, 8)
- out_video = F.uniform_temporal_subsample(video, 5)
- assert out_video.unique().tolist() == [0, 2, 4, 6, 9]
- out_video = F.uniform_temporal_subsample(video, 8)
- assert out_video.unique().tolist() == [0, 1, 2, 3, 5, 6, 7, 9]
|