readers.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. import collections
  2. import contextlib
  3. import itertools
  4. import pathlib
  5. import operator
  6. import re
  7. from . import abc
  8. from ._itertools import only
  9. from ._compat import ZipPath, ensure_traversable
  10. def remove_duplicates(items):
  11. return iter(collections.OrderedDict.fromkeys(items))
  12. class FileReader(abc.TraversableResources):
  13. def __init__(self, loader):
  14. self.path = pathlib.Path(loader.path).parent
  15. def resource_path(self, resource):
  16. """
  17. Return the file system path to prevent
  18. `resources.path()` from creating a temporary
  19. copy.
  20. """
  21. return str(self.path.joinpath(resource))
  22. def files(self):
  23. return self.path
  24. class ZipReader(abc.TraversableResources):
  25. def __init__(self, loader, module):
  26. _, _, name = module.rpartition('.')
  27. self.prefix = loader.prefix.replace('\\', '/') + name + '/'
  28. self.archive = loader.archive
  29. def open_resource(self, resource):
  30. try:
  31. return super().open_resource(resource)
  32. except KeyError as exc:
  33. raise FileNotFoundError(exc.args[0])
  34. def is_resource(self, path):
  35. """
  36. Workaround for `zipfile.Path.is_file` returning true
  37. for non-existent paths.
  38. """
  39. target = self.files().joinpath(path)
  40. return target.is_file() and target.exists()
  41. def files(self):
  42. return ZipPath(self.archive, self.prefix)
  43. class MultiplexedPath(abc.Traversable):
  44. """
  45. Given a series of Traversable objects, implement a merged
  46. version of the interface across all objects. Useful for
  47. namespace packages which may be multihomed at a single
  48. name.
  49. """
  50. def __init__(self, *paths):
  51. self._paths = list(map(ensure_traversable, remove_duplicates(paths)))
  52. if not self._paths:
  53. message = 'MultiplexedPath must contain at least one path'
  54. raise FileNotFoundError(message)
  55. if not all(path.is_dir() for path in self._paths):
  56. raise NotADirectoryError('MultiplexedPath only supports directories')
  57. def iterdir(self):
  58. children = (child for path in self._paths for child in path.iterdir())
  59. by_name = operator.attrgetter('name')
  60. groups = itertools.groupby(sorted(children, key=by_name), key=by_name)
  61. return map(self._follow, (locs for name, locs in groups))
  62. def read_bytes(self):
  63. raise FileNotFoundError(f'{self} is not a file')
  64. def read_text(self, *args, **kwargs):
  65. raise FileNotFoundError(f'{self} is not a file')
  66. def is_dir(self):
  67. return True
  68. def is_file(self):
  69. return False
  70. def joinpath(self, *descendants):
  71. try:
  72. return super().joinpath(*descendants)
  73. except abc.TraversalError:
  74. # One of the paths did not resolve (a directory does not exist).
  75. # Just return something that will not exist.
  76. return self._paths[0].joinpath(*descendants)
  77. @classmethod
  78. def _follow(cls, children):
  79. """
  80. Construct a MultiplexedPath if needed.
  81. If children contains a sole element, return it.
  82. Otherwise, return a MultiplexedPath of the items.
  83. Unless one of the items is not a Directory, then return the first.
  84. """
  85. subdirs, one_dir, one_file = itertools.tee(children, 3)
  86. try:
  87. return only(one_dir)
  88. except ValueError:
  89. try:
  90. return cls(*subdirs)
  91. except NotADirectoryError:
  92. return next(one_file)
  93. def open(self, *args, **kwargs):
  94. raise FileNotFoundError(f'{self} is not a file')
  95. @property
  96. def name(self):
  97. return self._paths[0].name
  98. def __repr__(self):
  99. paths = ', '.join(f"'{path}'" for path in self._paths)
  100. return f'MultiplexedPath({paths})'
  101. class NamespaceReader(abc.TraversableResources):
  102. def __init__(self, namespace_path):
  103. if 'NamespacePath' not in str(namespace_path):
  104. raise ValueError('Invalid path')
  105. self.path = MultiplexedPath(*map(self._resolve, namespace_path))
  106. @classmethod
  107. def _resolve(cls, path_str) -> abc.Traversable:
  108. r"""
  109. Given an item from a namespace path, resolve it to a Traversable.
  110. path_str might be a directory on the filesystem or a path to a
  111. zipfile plus the path within the zipfile, e.g. ``/foo/bar`` or
  112. ``/foo/baz.zip/inner_dir`` or ``foo\baz.zip\inner_dir\sub``.
  113. """
  114. (dir,) = (cand for cand in cls._candidate_paths(path_str) if cand.is_dir())
  115. return dir
  116. @classmethod
  117. def _candidate_paths(cls, path_str):
  118. yield pathlib.Path(path_str)
  119. yield from cls._resolve_zip_path(path_str)
  120. @staticmethod
  121. def _resolve_zip_path(path_str):
  122. for match in reversed(list(re.finditer(r'[\\/]', path_str))):
  123. with contextlib.suppress(
  124. FileNotFoundError, IsADirectoryError, PermissionError
  125. ):
  126. inner = path_str[match.end() :].replace('\\', '/') + '/'
  127. yield ZipPath(path_str[: match.start()], inner.lstrip('/'))
  128. def resource_path(self, resource):
  129. """
  130. Return the file system path to prevent
  131. `resources.path()` from creating a temporary
  132. copy.
  133. """
  134. return str(self.path.joinpath(resource))
  135. def files(self):
  136. return self.path