zip_by_pyminizip.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. # update: 2021-5-14-23
  2. from libraries import base_original as std
  3. from Crypto.Cipher import AES
  4. import base64
  5. import pyminizip
  6. import shutil
  7. import os
  8. class _AES(object):
  9. def __init__(self, key='7486E0264E999881D4EF7BEEDF05A9F7', model='ECB', iv='',
  10. code_type='utf-8'):
  11. """
  12. code_type: utf-8/gbk
  13. """
  14. self.code_type = code_type
  15. self.model = {'ECB': AES.MODE_ECB, 'CBC': AES.MODE_CBC}[model]
  16. self.key = self.replenish(key)
  17. # --- create aes object ---
  18. if model == 'ECB':
  19. self.aes = AES.new(self.key, self.model)
  20. elif model == 'CBC':
  21. self.aes = AES.new(self.key, self.model, iv)
  22. def replenish(self, block, block_size=16):
  23. """block_size: AES key must be either 16, 24, or 32 bytes long"""
  24. block = block.encode(self.code_type)
  25. while len(block) % block_size != 0:
  26. block += b'\x00'
  27. return block
  28. def encrypt(self, text):
  29. text = self.replenish(text)
  30. encrypt_text = self.aes.encrypt(text)
  31. return base64.encodebytes(encrypt_text).decode().strip()
  32. def decrypt(self, text):
  33. text = base64.decodebytes(text.encode(self.code_type))
  34. decrypt_text = self.aes.decrypt(text)
  35. return decrypt_text.decode(self.code_type).strip('\0')
  36. def text_to_aes(text, key='YourPassword'):
  37. """aes加密"""
  38. aes = _AES(key=key)
  39. return std.text_to_b64(aes.encrypt(text))
  40. def aes_to_text(text, key='YourPassword'):
  41. """aes解密"""
  42. aes = _AES(key=key)
  43. return aes.decrypt(std.b64_to_text(text))
  44. def file_to_zip_v2_2(file_path, output_dir, key='<YourPassword@2021>', compress_level=1):
  45. """file转zip(不删除加密文件,并输出到指定目录)"""
  46. # --- check parameter ---
  47. if not os.path.isfile(file_path):
  48. return dict(code=1, details='parameter error!')
  49. # --- check parameter ---
  50. if not os.path.isdir(output_dir):
  51. return dict(code=2, details='parameter error!')
  52. # --- check file name ---
  53. file_name = file_path.split('/')[-1]
  54. is_encrypted = len(file_name) == 37 and file_name[32] == '.'
  55. if is_encrypted:
  56. shutil.copy(file_path, f"{output_dir}/{file_name}")
  57. return dict(code=3, details='parameter error!')
  58. # --- create name file ---
  59. # dir_path = '/'.join(file_path.split('/')[:-1])
  60. file_name_aes = text_to_aes(file_name, key=key)
  61. file_name_md5 = std.byte_to_md5(std.text_to_byte(file_name_aes))
  62. name_file_path = f"{output_dir}/{file_name_md5[-4:]}"
  63. with open(name_file_path, 'w') as f1:
  64. f1.write(file_name_aes)
  65. # --- rename file ---
  66. file_md5 = std.get_big_file_md5(file_path)
  67. # data_file_path = f"{dir_path}/{file_md5}"
  68. # os.rename(file_path, data_file_path)
  69. data_file_path = f"{output_dir}/{file_md5}"
  70. shutil.copy(file_path, data_file_path)
  71. # --- define zip name ---
  72. zip_path = f"{output_dir}/{file_md5}.{file_name_md5[-4:]}"
  73. try:
  74. # --- writing ---
  75. """
  76. Args:
  77. 1. src file LIST path (list)
  78. 2. src file LIST prefix path (list) or []
  79. 3. dst file path (string)
  80. 4. password (string) or None (to create no-password zip)
  81. 5. compress_level(int) between 1 to 9, 1 (more fast) <---> 9 (more compress)
  82. 6. optional function to be called during processing which takes one argument, the count of how many files have been compressed
  83. """
  84. pyminizip.compress_multiple([data_file_path, name_file_path], ['', ''], zip_path, key, compress_level)
  85. # --- remove file ---
  86. os.remove(data_file_path)
  87. os.remove(name_file_path)
  88. return dict(code=0, details='ended.')
  89. except Exception as exception:
  90. # --- revert ---
  91. os.remove(name_file_path)
  92. os.remove(data_file_path)
  93. import traceback
  94. print(traceback.format_exc())
  95. return dict(code=-1, details=f"{traceback.format_exc()}")
  96. def zip_to_file_v2_2(file_path, key='<YourPassword>', no_path=True):
  97. """zip转file
  98. todo(不删除加密文件,并输出到指定目录)"""
  99. # --- check parameter ---
  100. if not os.path.isfile(file_path):
  101. return dict(code=1, details='parameter error!')
  102. # --- check again ---
  103. file_name = file_path.split('/')[-1]
  104. is_encrypted = len(file_name) == 37 and file_name[32] == '.'
  105. if not is_encrypted:
  106. return dict(code=2, details='parameter error!')
  107. # --- record ---
  108. dir_path = '/'.join(file_path.split('/')[:-1])
  109. names = os.listdir(dir_path)
  110. before_files = set(f"{dir_path}/{name}" for name in names if not os.path.isdir(f"{dir_path}/{name}"))
  111. try:
  112. # --- writing ---
  113. """
  114. Args:
  115. 1. src file path (string)
  116. 2. password (string) or None (to unzip encrypted archives)
  117. 3. dir path to extract files or None (to extract in a specific dir or cwd)
  118. 4. withoutpath (exclude path of extracted)
  119. """
  120. pyminizip.uncompress(file_path, key, dir_path, no_path)
  121. # --- record ---
  122. names = os.listdir(dir_path)
  123. after_files = set(f"{dir_path}/{name}" for name in names if not os.path.isdir(f"{dir_path}/{name}"))
  124. # --- check ---
  125. temp_file = list(after_files - before_files)
  126. if len(temp_file) != 2:
  127. raise Exception('something is wrong!')
  128. # --- get file name ---
  129. raw_name = str()
  130. for path in temp_file:
  131. name = path.split('/')[-1]
  132. if len(name) == 4:
  133. with open(path, 'r') as f1:
  134. raw_name = aes_to_text(f1.read(), key=key)
  135. os.remove(path)
  136. # --- rename ---
  137. for path in list(after_files - before_files):
  138. name = path.split('/')[-1]
  139. if len(name) == 32:
  140. os.rename(path, f"{dir_path}/{raw_name}")
  141. # --- remove file ---
  142. os.remove(file_path)
  143. return dict(code=0, details='ended.')
  144. except Exception as exception:
  145. # --- revert ---
  146. names = os.listdir(dir_path)
  147. after_files = set(f"{dir_path}/{name}" for name in names if not os.path.isdir(f"{dir_path}/{name}"))
  148. for path in list(after_files - before_files):
  149. os.remove(path)
  150. import traceback
  151. print(traceback.format_exc())
  152. return dict(code=-1, details=f"{traceback.format_exc()}")