123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- # update: 2021-5-14-23
- from libraries import base_original as std
- from Crypto.Cipher import AES
- import base64
- import pyminizip
- import shutil
- import os
- class _AES(object):
- def __init__(self, key='7486E0264E999881D4EF7BEEDF05A9F7', model='ECB', iv='',
- code_type='utf-8'):
- """
- code_type: utf-8/gbk
- """
- self.code_type = code_type
- self.model = {'ECB': AES.MODE_ECB, 'CBC': AES.MODE_CBC}[model]
- self.key = self.replenish(key)
- # --- create aes object ---
- if model == 'ECB':
- self.aes = AES.new(self.key, self.model)
- elif model == 'CBC':
- self.aes = AES.new(self.key, self.model, iv)
- def replenish(self, block, block_size=16):
- """block_size: AES key must be either 16, 24, or 32 bytes long"""
- block = block.encode(self.code_type)
- while len(block) % block_size != 0:
- block += b'\x00'
- return block
- def encrypt(self, text):
- text = self.replenish(text)
- encrypt_text = self.aes.encrypt(text)
- return base64.encodebytes(encrypt_text).decode().strip()
- def decrypt(self, text):
- text = base64.decodebytes(text.encode(self.code_type))
- decrypt_text = self.aes.decrypt(text)
- return decrypt_text.decode(self.code_type).strip('\0')
- def text_to_aes(text, key='YourPassword'):
- """aes加密"""
- aes = _AES(key=key)
- return std.text_to_b64(aes.encrypt(text))
- def aes_to_text(text, key='YourPassword'):
- """aes解密"""
- aes = _AES(key=key)
- return aes.decrypt(std.b64_to_text(text))
- def file_to_zip_v2_2(file_path, output_dir, key='<YourPassword@2021>', compress_level=1):
- """file转zip(不删除加密文件,并输出到指定目录)"""
- # --- check parameter ---
- if not os.path.isfile(file_path):
- return dict(code=1, details='parameter error!')
- # --- check parameter ---
- if not os.path.isdir(output_dir):
- return dict(code=2, details='parameter error!')
- # --- check file name ---
- file_name = file_path.split('/')[-1]
- is_encrypted = len(file_name) == 37 and file_name[32] == '.'
- if is_encrypted:
- shutil.copy(file_path, f"{output_dir}/{file_name}")
- return dict(code=3, details='parameter error!')
- # --- create name file ---
- # dir_path = '/'.join(file_path.split('/')[:-1])
- file_name_aes = text_to_aes(file_name, key=key)
- file_name_md5 = std.byte_to_md5(std.text_to_byte(file_name_aes))
- name_file_path = f"{output_dir}/{file_name_md5[-4:]}"
- with open(name_file_path, 'w') as f1:
- f1.write(file_name_aes)
- # --- rename file ---
- file_md5 = std.get_big_file_md5(file_path)
- # data_file_path = f"{dir_path}/{file_md5}"
- # os.rename(file_path, data_file_path)
- data_file_path = f"{output_dir}/{file_md5}"
- shutil.copy(file_path, data_file_path)
- # --- define zip name ---
- zip_path = f"{output_dir}/{file_md5}.{file_name_md5[-4:]}"
- try:
- # --- writing ---
- """
- Args:
- 1. src file LIST path (list)
- 2. src file LIST prefix path (list) or []
- 3. dst file path (string)
- 4. password (string) or None (to create no-password zip)
- 5. compress_level(int) between 1 to 9, 1 (more fast) <---> 9 (more compress)
- 6. optional function to be called during processing which takes one argument, the count of how many files have been compressed
- """
- pyminizip.compress_multiple([data_file_path, name_file_path], ['', ''], zip_path, key, compress_level)
- # --- remove file ---
- os.remove(data_file_path)
- os.remove(name_file_path)
- return dict(code=0, details='ended.')
- except Exception as exception:
- # --- revert ---
- os.remove(name_file_path)
- os.remove(data_file_path)
- import traceback
- print(traceback.format_exc())
- return dict(code=-1, details=f"{traceback.format_exc()}")
- def zip_to_file_v2_2(file_path, key='<YourPassword>', no_path=True):
- """zip转file
- todo(不删除加密文件,并输出到指定目录)"""
- # --- check parameter ---
- if not os.path.isfile(file_path):
- return dict(code=1, details='parameter error!')
- # --- check again ---
- file_name = file_path.split('/')[-1]
- is_encrypted = len(file_name) == 37 and file_name[32] == '.'
- if not is_encrypted:
- return dict(code=2, details='parameter error!')
- # --- record ---
- dir_path = '/'.join(file_path.split('/')[:-1])
- names = os.listdir(dir_path)
- before_files = set(f"{dir_path}/{name}" for name in names if not os.path.isdir(f"{dir_path}/{name}"))
- try:
- # --- writing ---
- """
- Args:
- 1. src file path (string)
- 2. password (string) or None (to unzip encrypted archives)
- 3. dir path to extract files or None (to extract in a specific dir or cwd)
- 4. withoutpath (exclude path of extracted)
- """
- pyminizip.uncompress(file_path, key, dir_path, no_path)
- # --- record ---
- names = os.listdir(dir_path)
- after_files = set(f"{dir_path}/{name}" for name in names if not os.path.isdir(f"{dir_path}/{name}"))
- # --- check ---
- temp_file = list(after_files - before_files)
- if len(temp_file) != 2:
- raise Exception('something is wrong!')
- # --- get file name ---
- raw_name = str()
- for path in temp_file:
- name = path.split('/')[-1]
- if len(name) == 4:
- with open(path, 'r') as f1:
- raw_name = aes_to_text(f1.read(), key=key)
- os.remove(path)
- # --- rename ---
- for path in list(after_files - before_files):
- name = path.split('/')[-1]
- if len(name) == 32:
- os.rename(path, f"{dir_path}/{raw_name}")
- # --- remove file ---
- os.remove(file_path)
- return dict(code=0, details='ended.')
- except Exception as exception:
- # --- revert ---
- names = os.listdir(dir_path)
- after_files = set(f"{dir_path}/{name}" for name in names if not os.path.isdir(f"{dir_path}/{name}"))
- for path in list(after_files - before_files):
- os.remove(path)
- import traceback
- print(traceback.format_exc())
- return dict(code=-1, details=f"{traceback.format_exc()}")
|