readers.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. import collections
  2. import pathlib
  3. import operator
  4. from . import abc
  5. from ._itertools import unique_everseen
  6. from ._compat import ZipPath
  7. def remove_duplicates(items):
  8. return iter(collections.OrderedDict.fromkeys(items))
  9. class FileReader(abc.TraversableResources):
  10. def __init__(self, loader):
  11. self.path = pathlib.Path(loader.path).parent
  12. def resource_path(self, resource):
  13. """
  14. Return the file system path to prevent
  15. `resources.path()` from creating a temporary
  16. copy.
  17. """
  18. return str(self.path.joinpath(resource))
  19. def files(self):
  20. return self.path
  21. class ZipReader(abc.TraversableResources):
  22. def __init__(self, loader, module):
  23. _, _, name = module.rpartition('.')
  24. self.prefix = loader.prefix.replace('\\', '/') + name + '/'
  25. self.archive = loader.archive
  26. def open_resource(self, resource):
  27. try:
  28. return super().open_resource(resource)
  29. except KeyError as exc:
  30. raise FileNotFoundError(exc.args[0])
  31. def is_resource(self, path):
  32. # workaround for `zipfile.Path.is_file` returning true
  33. # for non-existent paths.
  34. target = self.files().joinpath(path)
  35. return target.is_file() and target.exists()
  36. def files(self):
  37. return ZipPath(self.archive, self.prefix)
  38. class MultiplexedPath(abc.Traversable):
  39. """
  40. Given a series of Traversable objects, implement a merged
  41. version of the interface across all objects. Useful for
  42. namespace packages which may be multihomed at a single
  43. name.
  44. """
  45. def __init__(self, *paths):
  46. self._paths = list(map(pathlib.Path, remove_duplicates(paths)))
  47. if not self._paths:
  48. message = 'MultiplexedPath must contain at least one path'
  49. raise FileNotFoundError(message)
  50. if not all(path.is_dir() for path in self._paths):
  51. raise NotADirectoryError('MultiplexedPath only supports directories')
  52. def iterdir(self):
  53. files = (file for path in self._paths for file in path.iterdir())
  54. return unique_everseen(files, key=operator.attrgetter('name'))
  55. def read_bytes(self):
  56. raise FileNotFoundError(f'{self} is not a file')
  57. def read_text(self, *args, **kwargs):
  58. raise FileNotFoundError(f'{self} is not a file')
  59. def is_dir(self):
  60. return True
  61. def is_file(self):
  62. return False
  63. def joinpath(self, *descendants):
  64. try:
  65. return super().joinpath(*descendants)
  66. except abc.TraversalError:
  67. # One of the paths did not resolve (a directory does not exist).
  68. # Just return something that will not exist.
  69. return self._paths[0].joinpath(*descendants)
  70. def open(self, *args, **kwargs):
  71. raise FileNotFoundError(f'{self} is not a file')
  72. @property
  73. def name(self):
  74. return self._paths[0].name
  75. def __repr__(self):
  76. paths = ', '.join(f"'{path}'" for path in self._paths)
  77. return f'MultiplexedPath({paths})'
  78. class NamespaceReader(abc.TraversableResources):
  79. def __init__(self, namespace_path):
  80. if 'NamespacePath' not in str(namespace_path):
  81. raise ValueError('Invalid path')
  82. self.path = MultiplexedPath(*list(namespace_path))
  83. def resource_path(self, resource):
  84. """
  85. Return the file system path to prevent
  86. `resources.path()` from creating a temporary
  87. copy.
  88. """
  89. return str(self.path.joinpath(resource))
  90. def files(self):
  91. return self.path