test_datasets_utils.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. import contextlib
  2. import gzip
  3. import os
  4. import pathlib
  5. import re
  6. import tarfile
  7. import zipfile
  8. import pytest
  9. import torch
  10. import torchvision.datasets.utils as utils
  11. from common_utils import assert_equal
  12. from torch._utils_internal import get_file_path_2
  13. from torchvision.datasets.folder import make_dataset
  14. from torchvision.datasets.utils import _COMPRESSED_FILE_OPENERS
  15. TEST_FILE = get_file_path_2(
  16. os.path.dirname(os.path.abspath(__file__)), "assets", "encode_jpeg", "grace_hopper_517x606.jpg"
  17. )
  18. def patch_url_redirection(mocker, redirect_url):
  19. class Response:
  20. def __init__(self, url):
  21. self.url = url
  22. @contextlib.contextmanager
  23. def patched_opener(*args, **kwargs):
  24. yield Response(redirect_url)
  25. return mocker.patch("torchvision.datasets.utils.urllib.request.urlopen", side_effect=patched_opener)
  26. class TestDatasetsUtils:
  27. def test_get_redirect_url(self, mocker):
  28. url = "https://url.org"
  29. expected_redirect_url = "https://redirect.url.org"
  30. mock = patch_url_redirection(mocker, expected_redirect_url)
  31. actual = utils._get_redirect_url(url)
  32. assert actual == expected_redirect_url
  33. assert mock.call_count == 2
  34. call_args_1, call_args_2 = mock.call_args_list
  35. assert call_args_1[0][0].full_url == url
  36. assert call_args_2[0][0].full_url == expected_redirect_url
  37. def test_get_redirect_url_max_hops_exceeded(self, mocker):
  38. url = "https://url.org"
  39. redirect_url = "https://redirect.url.org"
  40. mock = patch_url_redirection(mocker, redirect_url)
  41. with pytest.raises(RecursionError):
  42. utils._get_redirect_url(url, max_hops=0)
  43. assert mock.call_count == 1
  44. assert mock.call_args[0][0].full_url == url
  45. def test_check_md5(self):
  46. fpath = TEST_FILE
  47. correct_md5 = "9c0bb82894bb3af7f7675ef2b3b6dcdc"
  48. false_md5 = ""
  49. assert utils.check_md5(fpath, correct_md5)
  50. assert not utils.check_md5(fpath, false_md5)
  51. def test_check_integrity(self):
  52. existing_fpath = TEST_FILE
  53. nonexisting_fpath = ""
  54. correct_md5 = "9c0bb82894bb3af7f7675ef2b3b6dcdc"
  55. false_md5 = ""
  56. assert utils.check_integrity(existing_fpath, correct_md5)
  57. assert not utils.check_integrity(existing_fpath, false_md5)
  58. assert utils.check_integrity(existing_fpath)
  59. assert not utils.check_integrity(nonexisting_fpath)
  60. def test_get_google_drive_file_id(self):
  61. url = "https://drive.google.com/file/d/1GO-BHUYRuvzr1Gtp2_fqXRsr9TIeYbhV/view"
  62. expected = "1GO-BHUYRuvzr1Gtp2_fqXRsr9TIeYbhV"
  63. actual = utils._get_google_drive_file_id(url)
  64. assert actual == expected
  65. def test_get_google_drive_file_id_invalid_url(self):
  66. url = "http://www.vision.caltech.edu/visipedia-data/CUB-200-2011/CUB_200_2011.tgz"
  67. assert utils._get_google_drive_file_id(url) is None
  68. @pytest.mark.parametrize(
  69. "file, expected",
  70. [
  71. ("foo.tar.bz2", (".tar.bz2", ".tar", ".bz2")),
  72. ("foo.tar.xz", (".tar.xz", ".tar", ".xz")),
  73. ("foo.tar", (".tar", ".tar", None)),
  74. ("foo.tar.gz", (".tar.gz", ".tar", ".gz")),
  75. ("foo.tbz", (".tbz", ".tar", ".bz2")),
  76. ("foo.tbz2", (".tbz2", ".tar", ".bz2")),
  77. ("foo.tgz", (".tgz", ".tar", ".gz")),
  78. ("foo.bz2", (".bz2", None, ".bz2")),
  79. ("foo.gz", (".gz", None, ".gz")),
  80. ("foo.zip", (".zip", ".zip", None)),
  81. ("foo.xz", (".xz", None, ".xz")),
  82. ("foo.bar.tar.gz", (".tar.gz", ".tar", ".gz")),
  83. ("foo.bar.gz", (".gz", None, ".gz")),
  84. ("foo.bar.zip", (".zip", ".zip", None)),
  85. ],
  86. )
  87. def test_detect_file_type(self, file, expected):
  88. assert utils._detect_file_type(file) == expected
  89. @pytest.mark.parametrize("file", ["foo", "foo.tar.baz", "foo.bar"])
  90. def test_detect_file_type_incompatible(self, file):
  91. # tests detect file type for no extension, unknown compression and unknown partial extension
  92. with pytest.raises(RuntimeError):
  93. utils._detect_file_type(file)
  94. @pytest.mark.parametrize("extension", [".bz2", ".gz", ".xz"])
  95. def test_decompress(self, extension, tmpdir):
  96. def create_compressed(root, content="this is the content"):
  97. file = os.path.join(root, "file")
  98. compressed = f"{file}{extension}"
  99. compressed_file_opener = _COMPRESSED_FILE_OPENERS[extension]
  100. with compressed_file_opener(compressed, "wb") as fh:
  101. fh.write(content.encode())
  102. return compressed, file, content
  103. compressed, file, content = create_compressed(tmpdir)
  104. utils._decompress(compressed)
  105. assert os.path.exists(file)
  106. with open(file) as fh:
  107. assert fh.read() == content
  108. def test_decompress_no_compression(self):
  109. with pytest.raises(RuntimeError):
  110. utils._decompress("foo.tar")
  111. def test_decompress_remove_finished(self, tmpdir):
  112. def create_compressed(root, content="this is the content"):
  113. file = os.path.join(root, "file")
  114. compressed = f"{file}.gz"
  115. with gzip.open(compressed, "wb") as fh:
  116. fh.write(content.encode())
  117. return compressed, file, content
  118. compressed, file, content = create_compressed(tmpdir)
  119. utils.extract_archive(compressed, tmpdir, remove_finished=True)
  120. assert not os.path.exists(compressed)
  121. @pytest.mark.parametrize("extension", [".gz", ".xz"])
  122. @pytest.mark.parametrize("remove_finished", [True, False])
  123. def test_extract_archive_defer_to_decompress(self, extension, remove_finished, mocker):
  124. filename = "foo"
  125. file = f"{filename}{extension}"
  126. mocked = mocker.patch("torchvision.datasets.utils._decompress")
  127. utils.extract_archive(file, remove_finished=remove_finished)
  128. mocked.assert_called_once_with(file, filename, remove_finished=remove_finished)
  129. def test_extract_zip(self, tmpdir):
  130. def create_archive(root, content="this is the content"):
  131. file = os.path.join(root, "dst.txt")
  132. archive = os.path.join(root, "archive.zip")
  133. with zipfile.ZipFile(archive, "w") as zf:
  134. zf.writestr(os.path.basename(file), content)
  135. return archive, file, content
  136. archive, file, content = create_archive(tmpdir)
  137. utils.extract_archive(archive, tmpdir)
  138. assert os.path.exists(file)
  139. with open(file) as fh:
  140. assert fh.read() == content
  141. @pytest.mark.parametrize(
  142. "extension, mode", [(".tar", "w"), (".tar.gz", "w:gz"), (".tgz", "w:gz"), (".tar.xz", "w:xz")]
  143. )
  144. def test_extract_tar(self, extension, mode, tmpdir):
  145. def create_archive(root, extension, mode, content="this is the content"):
  146. src = os.path.join(root, "src.txt")
  147. dst = os.path.join(root, "dst.txt")
  148. archive = os.path.join(root, f"archive{extension}")
  149. with open(src, "w") as fh:
  150. fh.write(content)
  151. with tarfile.open(archive, mode=mode) as fh:
  152. fh.add(src, arcname=os.path.basename(dst))
  153. return archive, dst, content
  154. archive, file, content = create_archive(tmpdir, extension, mode)
  155. utils.extract_archive(archive, tmpdir)
  156. assert os.path.exists(file)
  157. with open(file) as fh:
  158. assert fh.read() == content
  159. def test_verify_str_arg(self):
  160. assert "a" == utils.verify_str_arg("a", "arg", ("a",))
  161. pytest.raises(ValueError, utils.verify_str_arg, 0, ("a",), "arg")
  162. pytest.raises(ValueError, utils.verify_str_arg, "b", ("a",), "arg")
  163. @pytest.mark.parametrize(
  164. ("dtype", "actual_hex", "expected_hex"),
  165. [
  166. (torch.uint8, "01 23 45 67 89 AB CD EF", "01 23 45 67 89 AB CD EF"),
  167. (torch.float16, "01 23 45 67 89 AB CD EF", "23 01 67 45 AB 89 EF CD"),
  168. (torch.int32, "01 23 45 67 89 AB CD EF", "67 45 23 01 EF CD AB 89"),
  169. (torch.float64, "01 23 45 67 89 AB CD EF", "EF CD AB 89 67 45 23 01"),
  170. ],
  171. )
  172. def test_flip_byte_order(self, dtype, actual_hex, expected_hex):
  173. def to_tensor(hex):
  174. return torch.frombuffer(bytes.fromhex(hex), dtype=dtype)
  175. assert_equal(
  176. utils._flip_byte_order(to_tensor(actual_hex)),
  177. to_tensor(expected_hex),
  178. )
  179. @pytest.mark.parametrize(
  180. ("kwargs", "expected_error_msg"),
  181. [
  182. (dict(is_valid_file=lambda path: pathlib.Path(path).suffix in {".png", ".jpeg"}), "classes c"),
  183. (dict(extensions=".png"), re.escape("classes b, c. Supported extensions are: .png")),
  184. (dict(extensions=(".png", ".jpeg")), re.escape("classes c. Supported extensions are: .png, .jpeg")),
  185. ],
  186. )
  187. def test_make_dataset_no_valid_files(tmpdir, kwargs, expected_error_msg):
  188. tmpdir = pathlib.Path(tmpdir)
  189. (tmpdir / "a").mkdir()
  190. (tmpdir / "a" / "a.png").touch()
  191. (tmpdir / "b").mkdir()
  192. (tmpdir / "b" / "b.jpeg").touch()
  193. (tmpdir / "c").mkdir()
  194. (tmpdir / "c" / "c.unknown").touch()
  195. with pytest.raises(FileNotFoundError, match=expected_error_msg):
  196. make_dataset(str(tmpdir), **kwargs)
  197. if __name__ == "__main__":
  198. pytest.main([__file__])