archive_util.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. """Utilities for extracting common archive formats"""
  2. import zipfile
  3. import tarfile
  4. import os
  5. import shutil
  6. import posixpath
  7. import contextlib
  8. from distutils.errors import DistutilsError
  9. from ._path import ensure_directory
  10. __all__ = [
  11. "unpack_archive",
  12. "unpack_zipfile",
  13. "unpack_tarfile",
  14. "default_filter",
  15. "UnrecognizedFormat",
  16. "extraction_drivers",
  17. "unpack_directory",
  18. ]
  19. class UnrecognizedFormat(DistutilsError):
  20. """Couldn't recognize the archive type"""
  21. def default_filter(src, dst):
  22. """The default progress/filter callback; returns True for all files"""
  23. return dst
  24. def unpack_archive(filename, extract_dir, progress_filter=default_filter, drivers=None):
  25. """Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat``
  26. `progress_filter` is a function taking two arguments: a source path
  27. internal to the archive ('/'-separated), and a filesystem path where it
  28. will be extracted. The callback must return the desired extract path
  29. (which may be the same as the one passed in), or else ``None`` to skip
  30. that file or directory. The callback can thus be used to report on the
  31. progress of the extraction, as well as to filter the items extracted or
  32. alter their extraction paths.
  33. `drivers`, if supplied, must be a non-empty sequence of functions with the
  34. same signature as this function (minus the `drivers` argument), that raise
  35. ``UnrecognizedFormat`` if they do not support extracting the designated
  36. archive type. The `drivers` are tried in sequence until one is found that
  37. does not raise an error, or until all are exhausted (in which case
  38. ``UnrecognizedFormat`` is raised). If you do not supply a sequence of
  39. drivers, the module's ``extraction_drivers`` constant will be used, which
  40. means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that
  41. order.
  42. """
  43. for driver in drivers or extraction_drivers:
  44. try:
  45. driver(filename, extract_dir, progress_filter)
  46. except UnrecognizedFormat:
  47. continue
  48. else:
  49. return
  50. else:
  51. raise UnrecognizedFormat("Not a recognized archive type: %s" % filename)
  52. def unpack_directory(filename, extract_dir, progress_filter=default_filter):
  53. """ "Unpack" a directory, using the same interface as for archives
  54. Raises ``UnrecognizedFormat`` if `filename` is not a directory
  55. """
  56. if not os.path.isdir(filename):
  57. raise UnrecognizedFormat("%s is not a directory" % filename)
  58. paths = {
  59. filename: ('', extract_dir),
  60. }
  61. for base, dirs, files in os.walk(filename):
  62. src, dst = paths[base]
  63. for d in dirs:
  64. paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d)
  65. for f in files:
  66. target = os.path.join(dst, f)
  67. target = progress_filter(src + f, target)
  68. if not target:
  69. # skip non-files
  70. continue
  71. ensure_directory(target)
  72. f = os.path.join(base, f)
  73. shutil.copyfile(f, target)
  74. shutil.copystat(f, target)
  75. def unpack_zipfile(filename, extract_dir, progress_filter=default_filter):
  76. """Unpack zip `filename` to `extract_dir`
  77. Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined
  78. by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation
  79. of the `progress_filter` argument.
  80. """
  81. if not zipfile.is_zipfile(filename):
  82. raise UnrecognizedFormat("%s is not a zip file" % (filename,))
  83. with zipfile.ZipFile(filename) as z:
  84. _unpack_zipfile_obj(z, extract_dir, progress_filter)
  85. def _unpack_zipfile_obj(zipfile_obj, extract_dir, progress_filter=default_filter):
  86. """Internal/private API used by other parts of setuptools.
  87. Similar to ``unpack_zipfile``, but receives an already opened :obj:`zipfile.ZipFile`
  88. object instead of a filename.
  89. """
  90. for info in zipfile_obj.infolist():
  91. name = info.filename
  92. # don't extract absolute paths or ones with .. in them
  93. if name.startswith('/') or '..' in name.split('/'):
  94. continue
  95. target = os.path.join(extract_dir, *name.split('/'))
  96. target = progress_filter(name, target)
  97. if not target:
  98. continue
  99. if name.endswith('/'):
  100. # directory
  101. ensure_directory(target)
  102. else:
  103. # file
  104. ensure_directory(target)
  105. data = zipfile_obj.read(info.filename)
  106. with open(target, 'wb') as f:
  107. f.write(data)
  108. unix_attributes = info.external_attr >> 16
  109. if unix_attributes:
  110. os.chmod(target, unix_attributes)
  111. def _resolve_tar_file_or_dir(tar_obj, tar_member_obj):
  112. """Resolve any links and extract link targets as normal files."""
  113. while tar_member_obj is not None and (
  114. tar_member_obj.islnk() or tar_member_obj.issym()
  115. ):
  116. linkpath = tar_member_obj.linkname
  117. if tar_member_obj.issym():
  118. base = posixpath.dirname(tar_member_obj.name)
  119. linkpath = posixpath.join(base, linkpath)
  120. linkpath = posixpath.normpath(linkpath)
  121. tar_member_obj = tar_obj._getmember(linkpath)
  122. is_file_or_dir = tar_member_obj is not None and (
  123. tar_member_obj.isfile() or tar_member_obj.isdir()
  124. )
  125. if is_file_or_dir:
  126. return tar_member_obj
  127. raise LookupError('Got unknown file type')
  128. def _iter_open_tar(tar_obj, extract_dir, progress_filter):
  129. """Emit member-destination pairs from a tar archive."""
  130. # don't do any chowning!
  131. tar_obj.chown = lambda *args: None
  132. with contextlib.closing(tar_obj):
  133. for member in tar_obj:
  134. name = member.name
  135. # don't extract absolute paths or ones with .. in them
  136. if name.startswith('/') or '..' in name.split('/'):
  137. continue
  138. prelim_dst = os.path.join(extract_dir, *name.split('/'))
  139. try:
  140. member = _resolve_tar_file_or_dir(tar_obj, member)
  141. except LookupError:
  142. continue
  143. final_dst = progress_filter(name, prelim_dst)
  144. if not final_dst:
  145. continue
  146. if final_dst.endswith(os.sep):
  147. final_dst = final_dst[:-1]
  148. yield member, final_dst
  149. def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
  150. """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
  151. Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
  152. by ``tarfile.open()``). See ``unpack_archive()`` for an explanation
  153. of the `progress_filter` argument.
  154. """
  155. try:
  156. tarobj = tarfile.open(filename)
  157. except tarfile.TarError as e:
  158. raise UnrecognizedFormat(
  159. "%s is not a compressed or uncompressed tar file" % (filename,)
  160. ) from e
  161. for member, final_dst in _iter_open_tar(
  162. tarobj,
  163. extract_dir,
  164. progress_filter,
  165. ):
  166. try:
  167. # XXX Ugh
  168. tarobj._extract_member(member, final_dst)
  169. except tarfile.ExtractError:
  170. # chown/chmod/mkfifo/mknode/makedev failed
  171. pass
  172. return True
  173. extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile