123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633 |
- """
- As the name implies, these are legacy utilities that are hopefully removed soon. The future of
- transforms v2 testing is in test/test_transforms_v2_refactored.py. All new test should be
- implemented there and must not use any of the utilities here.
- The following legacy modules depend on this module
- - transforms_v2_kernel_infos.py
- - transforms_v2_dispatcher_infos.py
- - test_transforms_v2_functional.py
- - test_transforms_v2_consistency.py
- - test_transforms.py
- When all the logic is ported from the files above to test_transforms_v2_refactored.py, delete
- all the legacy modules including this one and drop the _refactored prefix from the name.
- """
- import collections.abc
- import dataclasses
- import enum
- import itertools
- import pathlib
- from collections import defaultdict
- from typing import Callable, Sequence, Tuple, Union
- import PIL.Image
- import pytest
- import torch
- from torchvision import tv_tensors
- from torchvision.transforms._functional_tensor import _max_value as get_max_value
- from torchvision.transforms.v2.functional import to_dtype_image, to_image, to_pil_image
- def combinations_grid(**kwargs):
- """Creates a grid of input combinations.
- Each element in the returned sequence is a dictionary containing one possible combination as values.
- Example:
- >>> combinations_grid(foo=("bar", "baz"), spam=("eggs", "ham"))
- [
- {'foo': 'bar', 'spam': 'eggs'},
- {'foo': 'bar', 'spam': 'ham'},
- {'foo': 'baz', 'spam': 'eggs'},
- {'foo': 'baz', 'spam': 'ham'}
- ]
- """
- return [dict(zip(kwargs.keys(), values)) for values in itertools.product(*kwargs.values())]
- DEFAULT_SIZE = (17, 11)
- NUM_CHANNELS_MAP = {
- "GRAY": 1,
- "GRAY_ALPHA": 2,
- "RGB": 3,
- "RGBA": 4,
- }
- def make_image(
- size=DEFAULT_SIZE,
- *,
- color_space="RGB",
- batch_dims=(),
- dtype=None,
- device="cpu",
- memory_format=torch.contiguous_format,
- ):
- num_channels = NUM_CHANNELS_MAP[color_space]
- dtype = dtype or torch.uint8
- max_value = get_max_value(dtype)
- data = torch.testing.make_tensor(
- (*batch_dims, num_channels, *size),
- low=0,
- high=max_value,
- dtype=dtype,
- device=device,
- memory_format=memory_format,
- )
- if color_space in {"GRAY_ALPHA", "RGBA"}:
- data[..., -1, :, :] = max_value
- return tv_tensors.Image(data)
- def make_image_tensor(*args, **kwargs):
- return make_image(*args, **kwargs).as_subclass(torch.Tensor)
- def make_image_pil(*args, **kwargs):
- return to_pil_image(make_image(*args, **kwargs))
- def make_bounding_boxes(
- canvas_size=DEFAULT_SIZE,
- *,
- format=tv_tensors.BoundingBoxFormat.XYXY,
- batch_dims=(),
- dtype=None,
- device="cpu",
- ):
- def sample_position(values, max_value):
- # We cannot use torch.randint directly here, because it only allows integer scalars as values for low and high.
- # However, if we have batch_dims, we need tensors as limits.
- return torch.stack([torch.randint(max_value - v, ()) for v in values.flatten().tolist()]).reshape(values.shape)
- if isinstance(format, str):
- format = tv_tensors.BoundingBoxFormat[format]
- dtype = dtype or torch.float32
- if any(dim == 0 for dim in batch_dims):
- return tv_tensors.BoundingBoxes(
- torch.empty(*batch_dims, 4, dtype=dtype, device=device), format=format, canvas_size=canvas_size
- )
- h, w = [torch.randint(1, c, batch_dims) for c in canvas_size]
- y = sample_position(h, canvas_size[0])
- x = sample_position(w, canvas_size[1])
- if format is tv_tensors.BoundingBoxFormat.XYWH:
- parts = (x, y, w, h)
- elif format is tv_tensors.BoundingBoxFormat.XYXY:
- x1, y1 = x, y
- x2 = x1 + w
- y2 = y1 + h
- parts = (x1, y1, x2, y2)
- elif format is tv_tensors.BoundingBoxFormat.CXCYWH:
- cx = x + w / 2
- cy = y + h / 2
- parts = (cx, cy, w, h)
- else:
- raise ValueError(f"Format {format} is not supported")
- return tv_tensors.BoundingBoxes(
- torch.stack(parts, dim=-1).to(dtype=dtype, device=device), format=format, canvas_size=canvas_size
- )
- def make_detection_mask(size=DEFAULT_SIZE, *, num_objects=5, batch_dims=(), dtype=None, device="cpu"):
- """Make a "detection" mask, i.e. (*, N, H, W), where each object is encoded as one of N boolean masks"""
- return tv_tensors.Mask(
- torch.testing.make_tensor(
- (*batch_dims, num_objects, *size),
- low=0,
- high=2,
- dtype=dtype or torch.bool,
- device=device,
- )
- )
- def make_segmentation_mask(size=DEFAULT_SIZE, *, num_categories=10, batch_dims=(), dtype=None, device="cpu"):
- """Make a "segmentation" mask, i.e. (*, H, W), where the category is encoded as pixel value"""
- return tv_tensors.Mask(
- torch.testing.make_tensor(
- (*batch_dims, *size),
- low=0,
- high=num_categories,
- dtype=dtype or torch.uint8,
- device=device,
- )
- )
- def make_video(size=DEFAULT_SIZE, *, num_frames=3, batch_dims=(), **kwargs):
- return tv_tensors.Video(make_image(size, batch_dims=(*batch_dims, num_frames), **kwargs))
- def make_video_tensor(*args, **kwargs):
- return make_video(*args, **kwargs).as_subclass(torch.Tensor)
- DEFAULT_SQUARE_SPATIAL_SIZE = 15
- DEFAULT_LANDSCAPE_SPATIAL_SIZE = (7, 33)
- DEFAULT_PORTRAIT_SPATIAL_SIZE = (31, 9)
- DEFAULT_SPATIAL_SIZES = (
- DEFAULT_LANDSCAPE_SPATIAL_SIZE,
- DEFAULT_PORTRAIT_SPATIAL_SIZE,
- DEFAULT_SQUARE_SPATIAL_SIZE,
- )
- def _parse_size(size, *, name="size"):
- if size == "random":
- raise ValueError("This should never happen")
- elif isinstance(size, int) and size > 0:
- return (size, size)
- elif (
- isinstance(size, collections.abc.Sequence)
- and len(size) == 2
- and all(isinstance(length, int) and length > 0 for length in size)
- ):
- return tuple(size)
- else:
- raise pytest.UsageError(
- f"'{name}' can either be `'random'`, a positive integer, or a sequence of two positive integers,"
- f"but got {size} instead."
- )
- def get_num_channels(color_space):
- num_channels = NUM_CHANNELS_MAP.get(color_space)
- if not num_channels:
- raise pytest.UsageError(f"Can't determine the number of channels for color space {color_space}")
- return num_channels
- VALID_EXTRA_DIMS = ((), (4,), (2, 3))
- DEGENERATE_BATCH_DIMS = ((0,), (5, 0), (0, 5))
- DEFAULT_EXTRA_DIMS = (*VALID_EXTRA_DIMS, *DEGENERATE_BATCH_DIMS)
- def from_loader(loader_fn):
- def wrapper(*args, **kwargs):
- device = kwargs.pop("device", "cpu")
- loader = loader_fn(*args, **kwargs)
- return loader.load(device)
- return wrapper
- def from_loaders(loaders_fn):
- def wrapper(*args, **kwargs):
- device = kwargs.pop("device", "cpu")
- loaders = loaders_fn(*args, **kwargs)
- for loader in loaders:
- yield loader.load(device)
- return wrapper
- @dataclasses.dataclass
- class TensorLoader:
- fn: Callable[[Sequence[int], torch.dtype, Union[str, torch.device]], torch.Tensor]
- shape: Sequence[int]
- dtype: torch.dtype
- def load(self, device):
- return self.fn(self.shape, self.dtype, device)
- @dataclasses.dataclass
- class ImageLoader(TensorLoader):
- spatial_size: Tuple[int, int] = dataclasses.field(init=False)
- num_channels: int = dataclasses.field(init=False)
- memory_format: torch.memory_format = torch.contiguous_format
- canvas_size: Tuple[int, int] = dataclasses.field(init=False)
- def __post_init__(self):
- self.spatial_size = self.canvas_size = self.shape[-2:]
- self.num_channels = self.shape[-3]
- def load(self, device):
- return self.fn(self.shape, self.dtype, device, memory_format=self.memory_format)
- def make_image_loader(
- size=DEFAULT_PORTRAIT_SPATIAL_SIZE,
- *,
- color_space="RGB",
- extra_dims=(),
- dtype=torch.float32,
- constant_alpha=True,
- memory_format=torch.contiguous_format,
- ):
- if not constant_alpha:
- raise ValueError("This should never happen")
- size = _parse_size(size)
- num_channels = get_num_channels(color_space)
- def fn(shape, dtype, device, memory_format):
- *batch_dims, _, height, width = shape
- return make_image(
- (height, width),
- color_space=color_space,
- batch_dims=batch_dims,
- dtype=dtype,
- device=device,
- memory_format=memory_format,
- )
- return ImageLoader(fn, shape=(*extra_dims, num_channels, *size), dtype=dtype, memory_format=memory_format)
- def make_image_loaders(
- *,
- sizes=DEFAULT_SPATIAL_SIZES,
- color_spaces=(
- "GRAY",
- "GRAY_ALPHA",
- "RGB",
- "RGBA",
- ),
- extra_dims=DEFAULT_EXTRA_DIMS,
- dtypes=(torch.float32, torch.float64, torch.uint8),
- constant_alpha=True,
- ):
- for params in combinations_grid(size=sizes, color_space=color_spaces, extra_dims=extra_dims, dtype=dtypes):
- yield make_image_loader(**params, constant_alpha=constant_alpha)
- make_images = from_loaders(make_image_loaders)
- def make_image_loader_for_interpolation(
- size=(233, 147), *, color_space="RGB", dtype=torch.uint8, memory_format=torch.contiguous_format
- ):
- size = _parse_size(size)
- num_channels = get_num_channels(color_space)
- def fn(shape, dtype, device, memory_format):
- height, width = shape[-2:]
- image_pil = (
- PIL.Image.open(pathlib.Path(__file__).parent / "assets" / "encode_jpeg" / "grace_hopper_517x606.jpg")
- .resize((width, height))
- .convert(
- {
- "GRAY": "L",
- "GRAY_ALPHA": "LA",
- "RGB": "RGB",
- "RGBA": "RGBA",
- }[color_space]
- )
- )
- image_tensor = to_image(image_pil)
- if memory_format == torch.contiguous_format:
- image_tensor = image_tensor.to(device=device, memory_format=memory_format, copy=True)
- else:
- image_tensor = image_tensor.to(device=device)
- image_tensor = to_dtype_image(image_tensor, dtype=dtype, scale=True)
- return tv_tensors.Image(image_tensor)
- return ImageLoader(fn, shape=(num_channels, *size), dtype=dtype, memory_format=memory_format)
- def make_image_loaders_for_interpolation(
- sizes=((233, 147),),
- color_spaces=("RGB",),
- dtypes=(torch.uint8,),
- memory_formats=(torch.contiguous_format, torch.channels_last),
- ):
- for params in combinations_grid(size=sizes, color_space=color_spaces, dtype=dtypes, memory_format=memory_formats):
- yield make_image_loader_for_interpolation(**params)
- @dataclasses.dataclass
- class BoundingBoxesLoader(TensorLoader):
- format: tv_tensors.BoundingBoxFormat
- spatial_size: Tuple[int, int]
- canvas_size: Tuple[int, int] = dataclasses.field(init=False)
- def __post_init__(self):
- self.canvas_size = self.spatial_size
- def make_bounding_box_loader(*, extra_dims=(), format, spatial_size=DEFAULT_PORTRAIT_SPATIAL_SIZE, dtype=torch.float32):
- if isinstance(format, str):
- format = tv_tensors.BoundingBoxFormat[format]
- spatial_size = _parse_size(spatial_size, name="spatial_size")
- def fn(shape, dtype, device):
- *batch_dims, num_coordinates = shape
- if num_coordinates != 4:
- raise pytest.UsageError()
- return make_bounding_boxes(
- format=format, canvas_size=spatial_size, batch_dims=batch_dims, dtype=dtype, device=device
- )
- return BoundingBoxesLoader(fn, shape=(*extra_dims[-1:], 4), dtype=dtype, format=format, spatial_size=spatial_size)
- def make_bounding_box_loaders(
- *,
- extra_dims=tuple(d for d in DEFAULT_EXTRA_DIMS if len(d) < 2),
- formats=tuple(tv_tensors.BoundingBoxFormat),
- spatial_size=DEFAULT_PORTRAIT_SPATIAL_SIZE,
- dtypes=(torch.float32, torch.float64, torch.int64),
- ):
- for params in combinations_grid(extra_dims=extra_dims, format=formats, dtype=dtypes):
- yield make_bounding_box_loader(**params, spatial_size=spatial_size)
- make_multiple_bounding_boxes = from_loaders(make_bounding_box_loaders)
- class MaskLoader(TensorLoader):
- pass
- def make_detection_mask_loader(size=DEFAULT_PORTRAIT_SPATIAL_SIZE, *, num_objects=5, extra_dims=(), dtype=torch.uint8):
- # This produces "detection" masks, i.e. `(*, N, H, W)`, where `N` denotes the number of objects
- size = _parse_size(size)
- def fn(shape, dtype, device):
- *batch_dims, num_objects, height, width = shape
- return make_detection_mask(
- (height, width), num_objects=num_objects, batch_dims=batch_dims, dtype=dtype, device=device
- )
- return MaskLoader(fn, shape=(*extra_dims, num_objects, *size), dtype=dtype)
- def make_detection_mask_loaders(
- sizes=DEFAULT_SPATIAL_SIZES,
- num_objects=(1, 0, 5),
- extra_dims=DEFAULT_EXTRA_DIMS,
- dtypes=(torch.uint8,),
- ):
- for params in combinations_grid(size=sizes, num_objects=num_objects, extra_dims=extra_dims, dtype=dtypes):
- yield make_detection_mask_loader(**params)
- make_detection_masks = from_loaders(make_detection_mask_loaders)
- def make_segmentation_mask_loader(
- size=DEFAULT_PORTRAIT_SPATIAL_SIZE, *, num_categories=10, extra_dims=(), dtype=torch.uint8
- ):
- # This produces "segmentation" masks, i.e. `(*, H, W)`, where the category is encoded in the values
- size = _parse_size(size)
- def fn(shape, dtype, device):
- *batch_dims, height, width = shape
- return make_segmentation_mask(
- (height, width), num_categories=num_categories, batch_dims=batch_dims, dtype=dtype, device=device
- )
- return MaskLoader(fn, shape=(*extra_dims, *size), dtype=dtype)
- def make_segmentation_mask_loaders(
- *,
- sizes=DEFAULT_SPATIAL_SIZES,
- num_categories=(1, 2, 10),
- extra_dims=DEFAULT_EXTRA_DIMS,
- dtypes=(torch.uint8,),
- ):
- for params in combinations_grid(size=sizes, num_categories=num_categories, extra_dims=extra_dims, dtype=dtypes):
- yield make_segmentation_mask_loader(**params)
- make_segmentation_masks = from_loaders(make_segmentation_mask_loaders)
- def make_mask_loaders(
- *,
- sizes=DEFAULT_SPATIAL_SIZES,
- num_objects=(1, 0, 5),
- num_categories=(1, 2, 10),
- extra_dims=DEFAULT_EXTRA_DIMS,
- dtypes=(torch.uint8,),
- ):
- yield from make_detection_mask_loaders(sizes=sizes, num_objects=num_objects, extra_dims=extra_dims, dtypes=dtypes)
- yield from make_segmentation_mask_loaders(
- sizes=sizes, num_categories=num_categories, extra_dims=extra_dims, dtypes=dtypes
- )
- make_masks = from_loaders(make_mask_loaders)
- class VideoLoader(ImageLoader):
- pass
- def make_video_loader(
- size=DEFAULT_PORTRAIT_SPATIAL_SIZE,
- *,
- color_space="RGB",
- num_frames=3,
- extra_dims=(),
- dtype=torch.uint8,
- ):
- size = _parse_size(size)
- def fn(shape, dtype, device, memory_format):
- *batch_dims, num_frames, _, height, width = shape
- return make_video(
- (height, width),
- num_frames=num_frames,
- batch_dims=batch_dims,
- color_space=color_space,
- dtype=dtype,
- device=device,
- memory_format=memory_format,
- )
- return VideoLoader(fn, shape=(*extra_dims, num_frames, get_num_channels(color_space), *size), dtype=dtype)
- def make_video_loaders(
- *,
- sizes=DEFAULT_SPATIAL_SIZES,
- color_spaces=(
- "GRAY",
- "RGB",
- ),
- num_frames=(1, 0, 3),
- extra_dims=DEFAULT_EXTRA_DIMS,
- dtypes=(torch.uint8, torch.float32, torch.float64),
- ):
- for params in combinations_grid(
- size=sizes, color_space=color_spaces, num_frames=num_frames, extra_dims=extra_dims, dtype=dtypes
- ):
- yield make_video_loader(**params)
- make_videos = from_loaders(make_video_loaders)
- class TestMark:
- def __init__(
- self,
- # Tuple of test class name and test function name that identifies the test the mark is applied to. If there is
- # no test class, i.e. a standalone test function, use `None`.
- test_id,
- # `pytest.mark.*` to apply, e.g. `pytest.mark.skip` or `pytest.mark.xfail`
- mark,
- *,
- # Callable, that will be passed an `ArgsKwargs` and should return a boolean to indicate if the mark will be
- # applied. If omitted, defaults to always apply.
- condition=None,
- ):
- self.test_id = test_id
- self.mark = mark
- self.condition = condition or (lambda args_kwargs: True)
- def mark_framework_limitation(test_id, reason, condition=None):
- # The purpose of this function is to have a single entry point for skip marks that are only there, because the test
- # framework cannot handle the kernel in general or a specific parameter combination.
- # As development progresses, we can change the `mark.skip` to `mark.xfail` from time to time to see if the skip is
- # still justified.
- # We don't want to use `mark.xfail` all the time, because that actually runs the test until an error happens. Thus,
- # we are wasting CI resources for no reason for most of the time
- return TestMark(test_id, pytest.mark.skip(reason=reason), condition=condition)
- class InfoBase:
- def __init__(
- self,
- *,
- # Identifier if the info that shows up the parametrization.
- id,
- # Test markers that will be (conditionally) applied to an `ArgsKwargs` parametrization.
- # See the `TestMark` class for details
- test_marks=None,
- # Additional parameters, e.g. `rtol=1e-3`, passed to `assert_close`. Keys are a 3-tuple of `test_id` (see
- # `TestMark`), the dtype, and the device.
- closeness_kwargs=None,
- ):
- self.id = id
- self.test_marks = test_marks or []
- test_marks_map = defaultdict(list)
- for test_mark in self.test_marks:
- test_marks_map[test_mark.test_id].append(test_mark)
- self._test_marks_map = dict(test_marks_map)
- self.closeness_kwargs = closeness_kwargs or dict()
- def get_marks(self, test_id, args_kwargs):
- return [
- test_mark.mark for test_mark in self._test_marks_map.get(test_id, []) if test_mark.condition(args_kwargs)
- ]
- def get_closeness_kwargs(self, test_id, *, dtype, device):
- if not (isinstance(test_id, tuple) and len(test_id) == 2):
- msg = "`test_id` should be a `Tuple[Optional[str], str]` denoting the test class and function name"
- if callable(test_id):
- msg += ". Did you forget to add the `test_id` fixture to parameters of the test?"
- else:
- msg += f", but got {test_id} instead."
- raise pytest.UsageError(msg)
- if isinstance(device, torch.device):
- device = device.type
- return self.closeness_kwargs.get((test_id, dtype, device), dict())
- class ArgsKwargs:
- def __init__(self, *args, **kwargs):
- self.args = args
- self.kwargs = kwargs
- def __iter__(self):
- yield self.args
- yield self.kwargs
- def load(self, device="cpu"):
- return ArgsKwargs(
- *(arg.load(device) if isinstance(arg, TensorLoader) else arg for arg in self.args),
- **{
- keyword: arg.load(device) if isinstance(arg, TensorLoader) else arg
- for keyword, arg in self.kwargs.items()
- },
- )
- def parametrized_error_message(*args, **kwargs):
- def to_str(obj):
- if isinstance(obj, torch.Tensor) and obj.numel() > 30:
- return f"tensor(shape={list(obj.shape)}, dtype={obj.dtype}, device={obj.device})"
- elif isinstance(obj, enum.Enum):
- return f"{type(obj).__name__}.{obj.name}"
- else:
- return repr(obj)
- if args or kwargs:
- postfix = "\n".join(
- [
- "",
- "Failure happened for the following parameters:",
- "",
- *[to_str(arg) for arg in args],
- *[f"{name}={to_str(kwarg)}" for name, kwarg in kwargs.items()],
- ]
- )
- else:
- postfix = ""
- def wrapper(msg):
- return msg + postfix
- return wrapper
|