# update: 2024-3-31-10 """ mode: # r 只读,默认打开方式,当文件不存在时会报错 # w 只写,当文件不存在时会自动创建文件,文件内容只能是字符串,只能写入字符串 # r+ 可读可写,当文件不存在时会报错 # w+ 可读可写。当文件不存在时会新建 # a 追加文件,不可读 # a+ 追加文件,可读可写 # rb 以二进制读模式打开,只可读 # rb+ 以二进制写读写模式打开,可读可写,当文件不存在时报错 # wb 以位进制写模式打开,只可写 # wb+ 以二进制读写模式打开,可读可写。当文件不存在时新建 # ab 以二进制追加模式打开,追加文件,不可读 # ab+ 以二进制读写模式打开,追加文件。可读可写 """ import hashlib import base64 import pickle import sys import os def read_bytes(file_path): with open(file_path, 'rb') as f: return f.read() def read_text(file_path): with open(file_path, 'r') as f: return f.read() def write_bytes(path, data=b''): with open(path, 'wb') as f: f.write(data) def write_text(path, data='', mode='w'): with open(path, mode) as f: f.write(data) def write_file(path, data=None, mode='wb'): with open(path, mode) as f: f.write(data) def load_pickle_file(file_path): with open(file_path, 'rb+') as f: return pickle.load(f) def save_pickle_file(file_path, data): with open(file_path, 'wb') as f: f.write(pickle.dumps(data)) def get_file_md5(file_path): """获取文件md5""" m = hashlib.md5() with open(file_path, 'rb') as f: m.update(f.read()) return m.hexdigest() def get_big_file_md5(file_path, block_size=8 * 1024): """获取文件md5(默认使用8KB作为分块大小)""" m = hashlib.md5() with open(file_path, 'rb') as f: while True: block = f.read(block_size) if not block: break m.update(block) return m.hexdigest() def _file_iterator(file_object, block_size=8 * 1024): with file_object: block = file_object.read(block_size) while len(block) > 0: yield block block = file_object.read(block_size) def get_big_file_md5_v2(file_path): """获取文件md5(默认使用8KB作为分块大小)""" file_object = open(file_path, 'rb') blocks = _file_iterator(file_object) m = hashlib.md5() for block in blocks: m.update(block) return m.hexdigest() def get_file_b64_md5(file_path): """获取文件的b64的md5""" m = hashlib.md5() with open(file_path, 'rb') as f: block = f.read() encoded = base64.b64encode(block) m.update(encoded) return m.hexdigest() def get_big_file_b64_md5(file_path, block_size=3 * 1024 * 1024): """流式获取文件的b64的md5(默认使用3MB作为分块大小)""" with open(file_path, 'rb') as f1, open(f'{file_path}.b64', 'wb') as f2: while True: block = f1.read(block_size) if not block: break b64_data = base64.b64encode(block) f2.write(b64_data) return get_big_file_md5(f'{file_path}.b64') def get_var_size(object, unit='MB'): """ unit: GB/MB/KB/B """ unit_dict = { 'GB': 3, 'MB': 2, 'KB': 1, 'B': 0, } byte = sys.getsizeof(object) size = byte / (1024 ** unit_dict.get(unit)) return float(round(size, 1)) def get_file_size(file_path, unit='MB'): """ unit: GB/MB/KB/B """ if not is_file(file_path): return None unit_dict = { 'GB': 3, 'MB': 2, 'KB': 1, 'B': 0, } byte = os.path.getsize(file_path) size = byte / (1024 ** unit_dict.get(unit)) return float(round(size, 1)) def get_file_path_list(dir_path='/root', path_list=None): """ 获取指定目录下全部文件 Example: get_file_path_list('/opt') ['/opt/1.txt', '/opt/2.txt'] # import glob # if os.path.isfile(input_dir): # img_list = [input_dir] # else: # img_list = sorted(glob.glob(os.path.join(input_dir, '*'))) """ if not path_list: path_list = [] for path, folders, files in os.walk(dir_path): for file_name in files: file_path = os.path.join(path, file_name) if file_path in path_list: continue # if file_path.split('.')[-1] == 'pdb': # remove_file(file_path) # print(f"#file_path: {file_path}") path_list.append(file_path) for folder_name in folders: get_file_path_list(os.path.join(path, folder_name), path_list) return path_list def mkdir(dir_path): """ 创建文件目录 Example: mkdir('/share/abc/xyz/1.txt') True """ if not dir_path: return False if dir_path[0] != '/': dir_path = f"{get_pwd()}/{dir_path}" path = '/' for part in dir_path.split('/'): path += f"{part}/" if not os.path.isdir(path): os.mkdir(path) return True def get_pwd(): """ 获取当前路径 """ path = sys.path[0] if not os.path.isdir(path): path = os.path.dirname(path) return path def is_file(file_path): """判断文件""" return os.path.isfile(file_path) def is_dir(file_path): """判断目录""" return os.path.isdir(file_path) def remove_file(file_path): """删除文件""" return os.remove(file_path) def move_file(s_path, d_path): """移动文件""" return os.rename(s_path, d_path) if __name__ == '__main__': """ """ # --- test --- # file_dir = r'E:\casper\repositories\repositories\SRI-DYZBC.Cockpit-cpp' # file_dir = r'E:\casper\repositories\repositories\SRI-DYZBC.Cockpit-cpp\EgoQt' # file_dir = r'E:\casper\repositories\repositories\SRI-DYZBC.Cockpit-cpp\EgoQt\thirdparty' # file_dir = r'E:\casper\repositories\repositories\SRI-DYZBC.Cockpit-cpp\EgoServer\webrtcinterop\x64\Release' # file_dir = r'E:\casper\repositories\repositories\SRI-DYZBC.Cockpit-cpp\EgoQt\webrtcinterop\x64\Release' # file_dir = r'E:\casper\repositories\repositories\SRI-DYZBC.Cockpit-cpp\EgoServer' file_dir = r'E:\casper\repositories\repositories\SRI-DYZBC.Cockpit-cpp\EgoQt' # file_dir = r'E:\casper\repositories\repositories\SRI-DYZBC.Cockpit-cpp\EgoServer' path_list = get_file_path_list(file_dir) print(path_list) # remove_file(r'E:\casper\repositories\repositories\SRI-DYZBC.Cockpit-cpp\EgoQt\webrtcinterop\x64\Release\webrtcinterop.obj')